HBase源码分析2—client和region定位原理
在上一篇文章HBase源码分析1—初试中简要介绍了HBase的整体组成。从这一篇开始逐渐从源码入手看下HBase内部究竟是如何工作的。
我们主要使用的代码是1.4.0版本的,同样的,Java使用的lib版本也是1.4.0,可以通过maven仓库下载到。
Java操作HBase的例子
假设我们已经创建了一个test表,并且有列族cf1, cf2
|
1 |
create 'test', {NAME=>'cf1'}, {NAME=>'cf2'} |
我们只进行一个简单的put操作,代码如下:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class SimpleExample { public static void main(String[] args) { try { Configuration conf = new Configuration(); // 1.4版本推荐的初始化方式,new HTable方式已被废弃 Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("test")); byte[] rowKey = Bytes.toBytes("key_yiz96"); byte[] columnFamily = Bytes.toBytes("cf1"); byte[] qualifier = Bytes.toBytes("name"); byte[] value = Bytes.toBytes("YiZheng"); Put put = new Put(rowKey); put.addColumn(columnFamily, qualifier, value); table.put(put); table.close(); } catch (IOException e) { e.printStackTrace(); } } } |
然后我们在hbase shell中使用 scan 'test' 命令就可以看到我们刚插入的数据了。
使Goto Impl可以看到源码
如果我们在上面代码的object上按 ⌘B (Intellij快捷键,查看代码实现),通常会显示一个.class的反编译的代码中(显示Decompiled .class file xxxxx),但是右上角还会有两个选项“download sources”和“choose sources”。如果已经下载过HBase的源码,可以直接点击choose sources,选择解压后的HBase根目录即可。这样我们就可以查看client的源代码了。
Client为我们做了什么
初始化和结束阶段
没啥好讲的,基本都是在构造各种对象,Conf、HTable等等,nothing special……
Put Object
我们跟踪下Put对象的实现,这个对象实现在hadoop-client项目中。它实现了7个构造函数和一些简单的操作函数比如添加列,有些函数已经在这个版本被打上deprecated的标记。其中addColumn函数族跟addImmutable函数族似乎没什么区别,我也不清楚具体是为啥留的接口……
我们跟一下addColumn:
|
1 2 3 4 5 6 7 8 9 |
public Put addColumn(byte[] family, ByteBuffer qualifier, long ts, ByteBuffer value) { if (ts < 0) { throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts); } List<Cell> list = getCellList(family); KeyValue kv = createPutKeyValue(family, qualifier, ts, value, null); list.add(kv); return this; } |
其实就是内部搞了一个familyMap对象( TreeMap<byte [], List<Cell>> ),将所有的put操作都丢进map里管理。这个familyMap是声明在Mutation类中的,而Put等类都继承自Mutation。
table.put(put)
put实现非常简单:
|
1 2 3 4 5 6 |
public void put(final Put put) throws IOException { getBufferedMutator().mutate(put); if (autoFlush) { flushCommits(); } } |
getBufferedMutator 会返回一个 BufferedMutator 对象,这个东西是一个异步的批量插入器,可以看到在他构造函数的最后一行搞了一个 ap = new AsyncProcess(xxx) 出来。我们看mutate做了什么事情:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
public void mutate(List<? extends Mutation> ms) throws InterruptedIOException, RetriesExhaustedWithDetailsException { if (closed) { throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); } // 计算这个操作会占用buffer多少空间 long toAddSize = 0; int toAddCount = 0; for (Mutation m : ms) { if (m instanceof Put) { validatePut((Put) m); } toAddSize += m.heapSize(); ++toAddCount; } // This behavior is highly non-intuitive... it does not protect us against // 94-incompatible behavior, which is a timing issue because hasError, the below code // and setter of hasError are not synchronized. Perhaps it should be removed. // 这段代码我追了一下没读懂,感觉真的should be removed,if {}里是一种兜底方案,全同步模式 if (ap.hasError()) { currentWriteBufferSize.addAndGet(toAddSize); writeAsyncBuffer.addAll(ms); undealtMutationCount.addAndGet(toAddCount); backgroundFlushCommits(true); } else { currentWriteBufferSize.addAndGet(toAddSize); writeAsyncBuffer.addAll(ms); undealtMutationCount.addAndGet(toAddCount); } // Now try and queue what needs to be queued. // 当前buffer大于writeBuffer上限,进行一次后台异步flush while (undealtMutationCount.get() != 0 && currentWriteBufferSize.get() > writeBufferSize) { backgroundFlushCommits(false); } } |
backgroundFlushCommits() 的参数表示“是否同步”,内部调用 ap.submit() ,然后经过一层层到action再到runnable的封装,在 sendMultiAction 函数中被最终run起来。无论当时是否选择同步,都会用这种方法执行,不同的是,同步方法会等待操作结束再返回。
ap.submit() 具体干了啥?留到最后我们再聊。
其他操作
其他操作诸如delete、get等都大同小异,不再赘述。
Region定位
我们话锋一转,先不谈client端代码。在上一篇中我们说过,Region是HBase数据管理的单位元,Region中只存某个特定Column Family的数据,client