HBase源码分析2—client和region定位原理
在上一篇文章HBase源码分析1—初试中简要介绍了HBase的整体组成。从这一篇开始逐渐从源码入手看下HBase内部究竟是如何工作的。
我们主要使用的代码是1.4.0版本的,同样的,Java使用的lib版本也是1.4.0,可以通过maven仓库下载到。
Java操作HBase的例子
假设我们已经创建了一个test表,并且有列族cf1, cf2
1 |
create 'test', {NAME=>'cf1'}, {NAME=>'cf2'} |
我们只进行一个简单的put操作,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class SimpleExample { public static void main(String[] args) { try { Configuration conf = new Configuration(); // 1.4版本推荐的初始化方式,new HTable方式已被废弃 Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("test")); byte[] rowKey = Bytes.toBytes("key_yiz96"); byte[] columnFamily = Bytes.toBytes("cf1"); byte[] qualifier = Bytes.toBytes("name"); byte[] value = Bytes.toBytes("YiZheng"); Put put = new Put(rowKey); put.addColumn(columnFamily, qualifier, value); table.put(put); table.close(); } catch (IOException e) { e.printStackTrace(); } } } |
然后我们在hbase shell中使用 scan 'test' 命令就可以看到我们刚插入的数据了。
使Goto Impl可以看到源码
如果我们在上面代码的object上按 ⌘B (Intellij快捷键,查看代码实现),通常会显示一个.class的反编译的代码中(显示Decompiled .class file xxxxx),但是右上角还会有两个选项“download sources”和“choose sources”。如果已经下载过HBase的源码,可以直接点击choose sources,选择解压后的HBase根目录即可。这样我们就可以查看client的源代码了。
Client为我们做了什么
初始化和结束阶段
没啥好讲的,基本都是在构造各种对象,Conf、HTable等等,nothing special……
Put Object
我们跟踪下Put对象的实现,这个对象实现在hadoop-client项目中。它实现了7个构造函数和一些简单的操作函数比如添加列,有些函数已经在这个版本被打上deprecated的标记。其中addColumn函数族跟addImmutable函数族似乎没什么区别,我也不清楚具体是为啥留的接口……
我们跟一下addColumn:
1 2 3 4 5 6 7 8 9 |
public Put addColumn(byte[] family, ByteBuffer qualifier, long ts, ByteBuffer value) { if (ts < 0) { throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts); } List<Cell> list = getCellList(family); KeyValue kv = createPutKeyValue(family, qualifier, ts, value, null); list.add(kv); return this; } |
其实就是内部搞了一个familyMap对象( TreeMap<byte [], List<Cell>> ),将所有的put操作都丢进map里管理。这个familyMap是声明在Mutation类中的,而Put等类都继承自Mutation。
table.put(put)
put实现非常简单:
1 2 3 4 5 6 |
public void put(final Put put) throws IOException { getBufferedMutator().mutate(put); if (autoFlush) { flushCommits(); } } |
getBufferedMutator 会返回一个 BufferedMutator 对象,这个东西是一个异步的批量插入器,可以看到在他构造函数的最后一行搞了一个 ap = new AsyncProcess(xxx) 出来。我们看mutate做了什么事情:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
public void mutate(List<? extends Mutation> ms) throws InterruptedIOException, RetriesExhaustedWithDetailsException { if (closed) { throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); } // 计算这个操作会占用buffer多少空间 long toAddSize = 0; int toAddCount = 0; for (Mutation m : ms) { if (m instanceof Put) { validatePut((Put) m); } toAddSize += m.heapSize(); ++toAddCount; } // This behavior is highly non-intuitive... it does not protect us against // 94-incompatible behavior, which is a timing issue because hasError, the below code // and setter of hasError are not synchronized. Perhaps it should be removed. // 这段代码我追了一下没读懂,感觉真的should be removed,if {}里是一种兜底方案,全同步模式 if (ap.hasError()) { currentWriteBufferSize.addAndGet(toAddSize); writeAsyncBuffer.addAll(ms); undealtMutationCount.addAndGet(toAddCount); backgroundFlushCommits(true); } else { currentWriteBufferSize.addAndGet(toAddSize); writeAsyncBuffer.addAll(ms); undealtMutationCount.addAndGet(toAddCount); } // Now try and queue what needs to be queued. // 当前buffer大于writeBuffer上限,进行一次后台异步flush while (undealtMutationCount.get() != 0 && currentWriteBufferSize.get() > writeBufferSize) { backgroundFlushCommits(false); } } |
backgroundFlushCommits() 的参数表示“是否同步”,内部调用 ap.submit() ,然后经过一层层到action再到runnable的封装,在 sendMultiAction 函数中被最终run起来。无论当时是否选择同步,都会用这种方法执行,不同的是,同步方法会等待操作结束再返回。
ap.submit() 具体干了啥?留到最后我们再聊。
其他操作
其他操作诸如delete、get等都大同小异,不再赘述。
Region定位
我们话锋一转,先不谈client端代码。在上一篇中我们说过,Region是HBase数据管理的单位元,Region中只存某个特定Column Family的数据,client操作通常也是对特定的region进行的,需要与管理这个Region的Region Server进行通信。如何知道region的位置呢?这一节我们聊聊Region是如何定位的。
在网上很容易找到关于 -ROOT- 表和 .META. 表的说法,这个概念已经在0.96版本移除。在0.96版本前, 有两个全局用于索引的表,一个叫 .META. 用于保存Region的索引,一个叫 -ROOT- 用于保存 .META. 的索引(因为 .META. 表也是以Region形式存储在Region Server上,可能会被分裂混布), -ROOT- 地址存储在zookeeper上。而现在,一切都不用那么麻烦了……
在1.4.0版本中,这个索引表就是 hbase:meta 表,表的位置储存在zookeeper上,表不允许被分裂,所以只会存在一个Region上。
所以以前的“8步查找过程”(出自《hbase实战》)现在变成了4步:
- 问zookeeper hbase:meta 表在哪个Region Server
- 读 hbase:meta ,找哪个Region Server存有数据
- 向Region Server请求数据
- 返回数据
ap.submit()
我们接着上一部分留下的问题继续聊: ap.submit() 具体干了啥?(找不到的小伙伴,函数出现在BufferedMutatorImpl.java的backgroudFlushCommits中)
我们追进最内层 submit() 函数(有点长不贴了),前十几行先定义了一堆变量,之后进入到一个do-while大循环。大循环里,先调用 RegionLocations locs = connection.locateRegion() 取得Region的位置,然后将Region和Row包装成一个action,放到一个队列里去。最后调用 submitMultiActions() 里的 ars.sendMultiAction() 挨个起线程跑Runnable对象,这部分比较简单,不多说了,主要看下 locateRegion() 那一块。
locateRegion()的实现在ConnectionManager.java中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public RegionLocations locateRegion(final TableName tableName, final byte [] row, boolean useCache, boolean retry, int replicaId) throws IOException { if (this.closed) throw new DoNotRetryIOException(toString() + " closed"); if (tableName== null || tableName.getName().length == 0) { throw new IllegalArgumentException( "table name cannot be null or zero length"); } if (tableName.equals(TableName.META_TABLE_NAME)) { return locateMeta(tableName, useCache, replicaId); } else { // Region not in the cache - have to go to the meta RS return locateRegionInMeta(tableName, row, useCache, retry, replicaId); } } |
如果要找meta_table,进入 locateMeta 函数,否则到 locationRegionInMeta 函数。
locateMeta 函数比较简单,如果有cache使用cache,否则就到 zookeeper.register 里面去取。
locationRegionInMeta 函数也会使用cache,但是如果不命中的话会清除cache,进入到若干次的读meta尝试,每次尝试会有间隔,具体涉及一些细节(如为什么要设置间隔等)在之后的讨论中会提及。