频道栏目
首页 > 资讯 > 云计算 > 正文

HBase性能优化方法总结(三):读表操作

17-06-19        来源:[db:作者]  
收藏   我要投稿

读表操作

3.1 多 HTable 并发读

创建多个 HTable 客户端用于读操作,提高读数据的吞吐量,一个例子:

static final Configuration conf = HBaseConfiguration.create();
static final String table_log_name = “user_log”;
rTableLog = new HTable[tableN];
for (int i = 0; i < tableN; i++) {
rTableLog[i] = new HTable(conf, table_log_name);
80
rTableLog[i].setScannerCaching(50);
}

3.2 HTable 参数设置

3.2.1 Scanner Caching

hbase.client.scanner.caching 配置项可以设置 HBase scanner 一次从服务端抓取的数据条数,
默认情况下一次一条。通过将其设置成一个合理的值,可以减少 scan 过程中 next()的时间开
销,代价是 scanner 需要通过客户端的内存来维持这些被 cache 的行记录。
有三个地方可以进行配置: 1)在 HBase 的 conf 配置文件中进行配置; 2)通过调用
HTable.setScannerCaching(int scannerCaching)进行配置; 3)通过调用
Scan.setCaching(int caching)进行配置。 三者的优先级越来越高。

3.2.2 Scan Attribute Selection

scan 时指定需要的 Column Family,可以减少网络传输数据量,否则默认 scan 操作会返回整行
所有 Column Family 的数据。

3.2.3 Close ResultScanner

通过 scan 取完数据后,记得要关闭 ResultScanner,否则 RegionServer 可能会出现问题(对
应的 Server 资源无法释放)。

3.3 批量读

通过调用 HTable.get(Get)方法可以根据一个指定的 row key 获取一行记录,同样 HBase 提供了另
一个方法:通过调用 HTable.get(List)方法可以根据一个指定的 row key 列表,批量获取多
行记录,这样做的好处是批量执行,只需要一次网络 I/O 开销,这对于对数据实时性要求高而且
网络传输 RTT 高的情景下可能带来明显的性能提升。

3.4 多线程并发读

在客户端开启多个 HTable 读线程,每个读线程负责通过 HTable 对象进行 get 操作。下面是一个
多线程并发读取 HBase,获取店铺一天内各分钟 PV 值的例子:

public class DataReaderServer {
//获取店铺一天内各分钟 PV 值的入口函数
public static ConcurrentHashMap getUnitMinutePV(long uid, long
startStamp, long endStamp){
long min = startStamp;
int count = (int)((endStamp - startStamp) / (60*1000));
List lst = new ArrayList();
for (int i = 0; i <= count; i++) {
min = startStamp + i * 60 * 1000;
lst.add(uid + "_" + min);
}
return parallelBatchMinutePV(lst);
}
//多线程并发查询,获取分钟 PV 值
private static ConcurrentHashMap parallelBatchMinutePV(List
lstKeys){
ConcurrentHashMap hashRet = new ConcurrentHashMap();
int parallel = 3;
List<>> lstBatchKeys = null;
if (lstKeys.size() < parallel ){
lstBatchKeys = new ArrayList<>>(1);
lstBatchKeys.add(lstKeys);
}
else{
lstBatchKeys = new ArrayList<>>(parallel);
for(int i = 0; i < parallel; i++ ){
List lst = new ArrayList();
lstBatchKeys.add(lst);
}
81
for(int i = 0 ; i < lstKeys.size() ; i ++ ){
lstBatchKeys.get(i%parallel).add(lstKeys.get(i));
}
}
List >> futures = new
ArrayList >>(5);
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("ParallelBatchQuery");
ThreadFactory factory = builder.build();
ThreadPoolExecutor executor = (ThreadPoolExecutor)
Executors.newFixedThreadPool(lstBatchKeys.size(), factory);
for(List keys : lstBatchKeys){
Callable< ConcurrentHashMap > callable = new
BatchMinutePVCallable(keys);
FutureTask< ConcurrentHashMap > future = (FutureTask<
ConcurrentHashMap >) executor.submit(callable);
futures.add(future);
}
executor.shutdown();
// Wait for all the tasks to finish
try {
boolean stillRunning = !executor.awaitTermination(
5000000, TimeUnit.MILLISECONDS);
if (stillRunning) {
try {
executor.shutdownNow();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} catch (InterruptedException e) {
try {
Thread.currentThread().interrupt();
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
// Look for any exception
for (Future f : futures) {
try {
if(f.get() != null)
{
hashRet.putAll((ConcurrentHashMap)f.get());
}
} catch (InterruptedException e) {
try {
Thread.currentThread().interrupt();
} catch (Exception e1) {
// TODO Auto-generated catch block
82
e1.printStackTrace();
}
} catch (ExecutionException e) {
e.printStackTrace();
}
}
return hashRet;
}
//一个线程批量查询,获取分钟 PV 值
protected static ConcurrentHashMap getBatchMinutePV(List
lstKeys){
ConcurrentHashMap hashRet = null;
List lstGet = new ArrayList();
String[] splitValue = null;
for (String s : lstKeys) {
splitValue = s.split("_");
long uid = Long.parseLong(splitValue[0]);
long min = Long.parseLong(splitValue[1]);
byte[] key = new byte[16];
Bytes.putLong(key, 0, uid);
Bytes.putLong(key, 8, min);
Get g = new Get(key);
g.addFamily(fp);
lstGet.add(g);
}
Result[] res = null;
try {
res = tableMinutePV[rand.nextInt(tableN)].get(lstGet);
} catch (IOException e1) {
logger.error("tableMinutePV exception, e=" + e1.getStackTrace());
}
if (res != null && res.length > 0) {
hashRet = new ConcurrentHashMap(res.length);
for (Result re : res) {
if (re != null && !re.isEmpty()) {
try {
byte[] key = re.getRow();
byte[] value = re.getValue(fp, cp);
if (key != null && value != null) {
hashRet.put(String.valueOf(Bytes.toLong(key,
Bytes.SIZEOF_LONG)), String.valueOf(Bytes
.toLong(value)));
}
} catch (Exception e2) {
logger.error(e2.getStackTrace());
}
}
}
}
return hashRet;
}
}
//调用接口类,实现 Callable 接口
class BatchMinutePVCallable implements Callable<>>{
83
private List keys;
public BatchMinutePVCallable(List lstKeys ) {
this.keys = lstKeys;
}
public ConcurrentHashMap call() throws Exception {
return DataReadServer.getBatchMinutePV(keys);
}
}

3.5 缓存查询结果

对于频繁查询 HBase 的应用场景,可以考虑在应用程序中做缓存,当有新的查询请求时,首先在
缓存中查找,如果存在则直接返回,不再查询 HBase;否则对 HBase 发起读请求查询,然后在应用
程序中将查询结果缓存起来。至于缓存的替换策略,可以考虑 LRU 等常用的策略。
LRU 策略: 简单的说就是缓存一定量的数据,当超过设定的阈值时就把一些过期的数据删除掉。

3.6 Blockcache

HBase 上 Regionserver 的内存分为两个部分,一部分作为 Memstore,主要用来写;另外一部分作
为 BlockCache,主要用于读。
写请求会先写入 Memstore, Regionserver 会给每个 region 提供一个 Memstore,当 Memstore 满
64MB 以后,会启动 flush 刷新到磁盘。当 Memstore 的总大小超过限制时(heapsize *
hbase.regionserver.global.memstore.upperLimit * 0.9),会强行启动 flush 进程,从最大的
Memstore 开始 flush 直到低于限制。
读请求先到 Memstore 中查数据,查不到就到 BlockCache 中查,再查不到就会到磁盘上读,并把
读的结果放入 BlockCache。由于 BlockCache 采用的是 LRU 策略,因此 BlockCache 达到上限
(heapsize * hfile.block.cache.size * 0.85)后,会启动淘汰机制,淘汰掉最老的一批数据。
一个 Regionserver 上有一个 BlockCache 和 N 个 Memstore,它们的大小之和不能大于等于
heapsize * 0.8,否则 HBase 不能启动。默认 BlockCache 为 0.2,而 Memstore 为 0.4。 对于注重
读响应时间的系统,可以将 BlockCache 设大些,比如设置 BlockCache=0.4, Memstore=0.39,
以加大缓存的命中率。

相关TAG标签
上一篇:谈一谈Java8的函数式编程 (三) --几道关于流的练习题
下一篇:java画随机颜色同心圆
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站