首页 > 网络 > 云计算 > 正文
HBase的Scan实现源码分析
2016-09-24       个评论      
收藏    我要投稿
我们从接口InternalScanner开始分析,实现该接口的类表示其是使用于HBase内部的scanner,不暴露给客户端使用。实现了这个接口的类如下所示:
KeyValueHeap、StoreScanner、RegionScanner
 
下面再看KeyValueScanner,KeyValueScanner也是一个接口,它是一个可以向外迭代出KeyValue的scanner。它定义的主要方法包括了peek()、next()、seek(KeyValue key)等等,其中next和peek都能获取scanner中的下一个KeyValue,但是next会移动iterator,而peek不会,而seek就是将iterator定位到指定的KeyValue,如果不存在该KeyValue则定位到其后面的那个KeyValue,在scanner初始化的时候会调用下seek接口。
 
需要注意的是KeyValueScanner是可以排序的,其大小由peek()获取到KeyValue的大小决定,即如果KeyValueScanner1.peek() < KeyValueScanner2.peek(),则KeyValueScanner1 < KeyValueScanner2。KeyValue的大小比较依字典序进行,比较的优先级依次是RowKey cf+cq timestamp type,当比较两个keyValue时,先比较RowKey的大小('row1' < 'row2'),相同的情况下比较cf+cq的大小('cf1:q1' < 'cf2:q1' < 'cf2:q2'),还相同则比较时间戳,时间戳值越大则数据越新,在队列中的位置越靠前。最后比较TYPE('DeleteFamily' < 'DeleteColumn' < 'Delete' < 'Put')。
 
再看第三个关键的类KeyValueHeap,该类实现了上述两个接口,并且包含了三个主要的成员变量,分别是由KeyValueScanner组成的堆heap(优先级队列,内部实现就是堆),heap的堆顶元素current(注意该元素是个KeyValueScanner),以及用于比较KeyValueScanner元素的comparator。
 
接着是KeyValueHeap中的几个重要方法,首先是peek和next,他们都是返回堆顶元素(Cell,也就是KeyValue),不同在于next会将堆顶元素出堆,并重新调整堆,对外来说就是迭代器向前移动了,而peek()不会将堆顶出堆,堆顶不变。实现时peek方法实际上进一步调用了current元素的peek方法,也就是KeyValueScanner的peek方法。
 
public Cell peek() {
   if (this.current == null) {
      return null;
   }
   return this.current.peek();
}

讲完了上述三个重要的类,回归到hbase的结构,HBase的表数据分为多个层次,分别是HRegion->HStore->[HFile,HFile,....,MemStoreFile]。一个表首先会水平分片形成多个HRegion,一个HRegion内不同的Column Family对应着不同的HStore,一个HStore下包含多个HFile和一个Memstore,数据写入时先写入MemstoreFile,MemStoreFile会不断刷新形成新的HFile。
 
复杂的数据结构形成了复杂的Scanner,在一个scan流程中,会形成如下描述的scanner对象:每个region的数据读取由一个RegionScanner对象负责,RegionScanner有一个scanner的优先队列,里面放的是storeScanner(这个优先队列在实现上是由多个StoreScanner组成的堆,使用RegionScanner的成员变量KeyValueHeap storeHeap来表示)。
 
每个StoreScanner对象对应着一个Column Family内的数据读取,其也有一个KeyValueHeap类型的成员变量heap,保存的是隶属于该store的MemStoreScanner和StoreFileScanner。
 
storeHeap&StoreScanner的构造代码如下所示,在RegionScannerImpl中,会遍历该Region下的所有store,并针对每个store建立对应的StoreScanner。
 
for (Map.Entry> entry :
          scan.getFamilyMap().entrySet()) {         //遍历该region下的各store
        Store store = stores.get(entry.getKey());
        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);  //new一个该store的StoreScanner
        if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
          || this.filter.isFamilyEssential(entry.getKey())) {
          scanners.add(scanner);        //将不同的StoreScanner归入不同的scanner list中
        } else {
          joinedScanners.add(scanner);
        }
}
initializeKVHeap(scanners, joinedScanners, region);   //然后用这些StoreScanner初始化一个KeyValueHeap
store.getScanner就是针对每个store创建一个StoreScanner,最后一步的initializeKVHeap则将这些StoreScanner构建成一个堆保存在成员变量storeHeap中。store.getScanner调用了StoreScanner的构造函数,构造函数中的关键两步如下所示:
 
// Pass columns to try to filter out unnecessary StoreFiles.
    List scanners = getScannersNoCompaction();  //返回该Store下对应的MemStore/StoreFile Scanner

    // Seek all scanners to the start of the Row (or if the exact matching row
    // key does not exist, then to the start of the next matching Row).
    // Always check bloom filter to optimize the top row seek for delete
    // family marker.
    seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
        && lazySeekEnabledGlobally, isParallelSeekEnabled);     //对这些StoreFileScanner和MemStoreScanner分别进行seek
                                                                //seekKey是matcher.getStartKey()
getScannersNoCompaction()返回这个Store下包含的HFileScanner和memstoreScanner,保存在scanners中。
 
seekScanners就是在memstore或者hfile中定位到指定的keyValue(通常是scan时startKey&endKey指定的keyValue),如果指定的keyValue不存在,则seek到指定keyValue的下一个元素。实际实现时这里采用了lazy seek优化,优化的目的是为了不需要对所有的HFile进行seek寻找目标keyValue,而只需对keyValue真实存在的HFile进行seek。
 
典型的客户端发起scan请求的代码如下所示:
 
Scan scan = new Scan();
scan.setStartRow(........);
scan.setStopRow(........);
Result result;
try (ResultScanner rs = table.getScanner(scan)) {
   while ((result=rs.next()) != null) {
       //your code here
   }
}
进入上述代码的getScanner方法,会发现其new一个ClientScanner对象,该对象包含了用户传入的Scan对象以及缓存、连接、重试次数、表名、region信息等参数。ClientScanner构造函数的最后调用initializeScannerInConstruction(),这个函数实际上包装了一个如下的调用:
nextScanner(this.caching, false);
其中,this.caching是一个int型变量,表示一次scan的rpc请求返回的结果数量,返回结果保存在客户端的cache中。
 
进入nextScanner函数,其首先检查是否已scan至表尾,如果已scan至表尾则关闭scan并返回false给客户端,否则更新localStartKey作为本次scan的开始位置,并将输入参数this.caching赋值给nbRows以表示本次scan返回的数据量,以上述两个变量作为参数调用getScannerCallable方法,该方法会返回一个ScannerCallableWithReplicas类型的对象callable,接着调用callable对象中的call方法向服务端发起一次rpc调用,调用路径如下:
ScannerCallableWithReplicas.call->ScannerCallable.call
服务调用是通过构造一个ScanRequest类型的对象request,并将其发往服务端来实现的,核心代码如下:
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
controller = controllerFactory.newController();
response = getStub().scan(controller, request);
这里需要注意的是构造request时包装了一个long型的变量nextCallSeq。
 
至此客户端发起scan请求的流程结束,下面介绍服务端是如何处理这些scan请求,并与前面的知识建立起联系。
 
客户端调用的getStub().scan向服务端发起了一次scan的rpc请求,服务端scan的实现在RSRpcServices中,首先其申请一个租约Lease.lease,用于客户端和服务端之间的心跳连接,然后对照客户端和服务端的nextCallSeq字段(目的是保证客户端顺序得到所有数据而不漏),代码如下:
 
if (request.hasNextCallSeq()) {
          if (rsh == null) {
            rsh = scanners.get(scannerName);
          }
          if (rsh != null) {
            if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
              throw new OutOfOrderScannerNextException(
                "Expected nextCallSeq: " + rsh.getNextCallSeq()
                + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
                "; request=" + TextFormat.shortDebugString(request));
            }
            // Increment the nextCallSeq value which is the next expected from client.
            rsh.incNextCallSeq();  //保证客户端顺序得到所有数据不漏,Client和RS都维护一个nextCallSeq字段
          }
}

RSRpcServices中包含一个ConcurrentHashMap类型的变量scanners,String是region的名字,也就是说每一个region都租用一个RegionScanner。回到scan函数,参数request中可以获得scannerName,凭借scannerName从scanners中获取对应的RegionScanner对象scanner。
 
接着从request中提取本次scan的信息,如是否是small scan、reverse scan等等,根据这些信息构造ScannerContext类型的对象scannerContext,以此为参数调用RegionScanner的nextRaw方法,这样就与前面介绍的RegionScanner建立起联系,返回结果存放在List类型的变量values中:
moreRows = scanner.nextRaw(values, scannerContext)

newRaw方法的调用路径如下:
nextRaw() -> RegionScanner.nextInternal() -> populateResult()
其中,在RegionScanner.nextInternal()会进行一些对stopRow/filterRow的检查,populateResult函数开始迭代取数据,调用语句如下:
populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);
this.storeHeap是前面我们说的RegionScanner中维护的一个由StoreScanner组成的堆。populateResult的主要逻辑简化如下:
do{
      heap.next(results, scannerContext);
                    
      nextKv= heap.peek();
      moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
} while (moreCellsInRow)
populateResult中真正返回数据调用的是heap的next方法,这里还记得前面说的heap是由storescanner组成的堆,在next中用current变量记住当前正在处理的storescanner,然后调用next()函数返回了该storescanner中可能存在的结果。
 
到StoreScanner的next方法,StoreScanner维护着一个由StoreFileScanner/memstoreScanner构造的堆,next实际是从它的scanner堆中peek出一个StoreFileScanner或者是MemStoreScanner,然后调用next()取得数据,再将该scanner添加回队列中。
点击复制链接 与好友分享!回本站首页
上一篇:PGM:部分观测数据
下一篇:PGM:贝叶斯网的参数估计2
相关文章
图文推荐
文章
推荐
热门新闻

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训
版权所有: 红黑联盟--致力于做实用的IT技术学习网站