频道栏目
首页 > 资讯 > 云计算 > 正文

java访问Hbase

17-08-03        来源:[db:作者]  
收藏   我要投稿
package com.db.hadoop.hbase01;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.util.Bytes;


public class HBaseUtil {
private static Configuration conf;
private static Connection con;
private static Admin admin;
private static ExecutorService pool;

static{
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "slave01,slave02,slave03");
//加载hbase.properties配置文件信息
/*ResourceBundle rb = ResourceBundles.getBundle("hbase");
Enumeration kvs = rb.getKeys();
while(kvs.hasMoreElements()){
String key = kvs.nextElement();
String value = rb.getString(key);
conf.set(key, value);
}*/
}

/*
* 连接数据库
*/
public static Connection getConn(){
try {
pool = Executors.newCachedThreadPool();
con = ConnectionFactory.createConnection(conf, pool);
return con;
} catch (IOException e) {
throw new RuntimeException("数据库连接失败....");
}
}


/*
* 关闭
*/
public static void close(){
try {
if(admin != null){
admin.close();
}

} catch (IOException e) {
throw new RuntimeException("关闭管理者失败....");
}
try {
if(con != null && con.isClosed()){
con.close();
}

} catch (IOException e) {
throw new RuntimeException("关闭数据库连接失败....");
}
try{
if(!pool.isShutdown() && pool != null){
pool.shutdown();
}

}catch (Exception e) {
throw new RuntimeException("关闭线程池失败....");
}
}


/*
* 得到admin
*/
public static Admin getAdmin(){
try {
getConn();
admin = con.getAdmin();
return admin;
} catch (IOException e) {
throw new RuntimeException("取到表的管理者失败....");
}
}


/*
* 创建表
* @param tableName:表名
* @param ...columnFamilies:列族
*/
public static boolean createTable(String tableName, String... columnFamilies){
getAdmin();
TableName tn = TableName.valueOf(tableName);
try {
if(!admin.tableExists(tn)){
HTableDescriptor htd = new HTableDescriptor(tn);
for(String cf : columnFamilies){
HColumnDescriptor hcd01 = new HColumnDescriptor(Bytes.toBytes(cf));
htd.addFamily(hcd01);
}
admin.createTable(htd);
return true;
}else{
throw new TableExistsException("表存在。。。。。。");
}
} catch (IOException e) {
throw new RuntimeException(e);
}finally{
close();
}
}


/*
* 删除表
* @param tableName:表名
*/
public static boolean delTable(String tableName){
try {
getAdmin();
TableName delTN = TableName.valueOf(tableName);
if(admin.tableExists(delTN)){
admin.disableTable(delTN);
admin.deleteTable(delTN);
return true;
}else{
throw new TableExistsException("表不存在。。。。。");
}
}catch (IOException e) {
throw new RuntimeException(e);
}finally{
close();
}
}


/*
* 对表进行增删改的操作,增加删除单条记录
* @param tableName:表名
* @param MutationType:对表的操作类型,只处理两种PUT和DELETE
* @param rowkey:行健值
* @param ...params:3个参数时是:列族名,单元修饰名,单元格的值。2个参数时是:列族名,单元修饰名。1个参数时是:列族名
*/
public static void doUpdate(String tableName, MutationType mt, String rowkey, String... params){
try{
getAdmin();
TableName tn = TableName.valueOf(tableName);
if(admin.tableExists(tn)){
Table t = con.getTable(tn, pool);
switch(mt){
case PUT:
Put put = null;
if(params.length == 3){
put = new Put(Bytes.toBytes(rowkey)).addColumn(Bytes.toBytes(params[0]),
Bytes.toBytes(params[1]), Bytes.toBytes(params[2]));
}else{
throw new RuntimeException("参数不为三个....");
}
t.put(put);
break;
case DELETE:
Delete del = new Delete(Bytes.toBytes(rowkey));
if(params != null && params.length != 0){
switch(params.length){
case 1:
del.addFamily((Bytes.toBytes(params[0])));
break;
case 2:
del.addColumn(Bytes.toBytes(params[0]), Bytes.toBytes(params[1]));
break;
default:
throw new RuntimeException("最多为两个参数。。。。。");
}
}
t.delete(del);
break;
default:
throw new RuntimeException("只能进行增删改操作。。。。。。。");
}
}
}catch(Exception e){
throw new RuntimeException("进行增删改操作失败。。。。。");
}finally{
close();
}
}


/*
* 对表进行增删改的操作,增加删除多条记录
* @param tableName:表名
* @param MutationType:对表的操作类型,只处理两种PUT和DELETE
* @param rowkey:行健值
* @param ...params:3个参数时是:列族名,单元修饰名,单元格的值。2个参数时是:列族名,单元修饰名。1个参数时是:列族名
*/
public static void doUpdate(String tableName, MutationType mt, String rowkey, String[]... params){
try{
getAdmin();
TableName tn = TableName.valueOf(tableName);
if(admin.tableExists(tn)){
Table t = con.getTable(tn, pool);
switch(mt){
case PUT:
Put put = null;
for(String[] ps : params){
if(ps.length == 3){
put = new Put(Bytes.toBytes(rowkey)).addColumn(Bytes.toBytes(ps[0]),
Bytes.toBytes(ps[1]), Bytes.toBytes(ps[2]));
}else{
throw new RuntimeException("参数不为三个....");
}
t.put(put);
}
break;
case DELETE:
Delete del = new Delete(Bytes.toBytes(rowkey));
for(String[] ps : params){
if(ps != null && ps.length != 0){
switch(ps.length){
case 1:
del.addFamily((Bytes.toBytes(ps[0])));
break;
case 2:
del.addColumn(Bytes.toBytes(ps[0]), Bytes.toBytes(ps[1]));
break;
default:
throw new RuntimeException("最多为两个参数。。。。。");
}
}
t.delete(del);
}
break;
default:
throw new RuntimeException("只能进行增删改操作。。。。。。。");
}
}
}catch(Exception e){
throw new RuntimeException("进行增删改操作失败。。。。。");
}finally{
close();
}
}


/*
* 对表进行增删改的操作,增加删除多条记录
* @param tableName:表名
* @param MutationType:对表的操作类型,只处理两种PUT和DELETE
* @param Map>
*/
public static void doUpdate(String tableName, MutationType mt, Map> params){
try{
getAdmin();
TableName tn = TableName.valueOf(tableName);
if(admin.tableExists(tn)){
Table t = con.getTable(tn, pool);
switch(mt){
case PUT:
List puts = new ArrayList();
for(Entry> entry : params.entrySet()){
Put put = new Put(Bytes.toBytes(entry.getKey()));
for(String[] ps : entry.getValue()){
if(ps.length == 3){
put.addColumn(Bytes.toBytes(ps[0]), Bytes.toBytes(ps[1]), Bytes.toBytes(ps[2]));
}else{
throw new RuntimeException("参数不为三个....");
}
}
puts.add(put);
}
t.put(puts);
break;
case DELETE:
List dels = new ArrayList();
for(Entry> entry : params.entrySet()){
Delete del = new Delete(Bytes.toBytes(entry.getKey()));
if(entry.getValue() != null){
for(String[] ps : entry.getValue()){
if(params != null && ps.length != 0){
switch(ps.length){
case 1:
del.addFamily((Bytes.toBytes(ps[0])));
break;
case 2:
del.addColumn(Bytes.toBytes(ps[0]), Bytes.toBytes(ps[1]));
break;
default:
throw new RuntimeException("最多为两个参数。。。。。");
}
}
}
}
dels.add(del);
}
t.delete(dels);
break;
default:
throw new RuntimeException("只能进行增删改操作。。。。。。。");
}
}
}catch(Exception e){
throw new RuntimeException("进行增删改操作失败。。。。。");
}finally{
close();
}
}


/*
* 得到单条表的数据
* @param tableName:表名
* @param rowKey:行健
* @param columnFamily:列族
* @param qualifier:列
*/
public static String get(String tableName, String rowKey, String columnFamily, String qualifier){
getAdmin();
try {
Table t = con.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
Result r = t.get(get);
List cells = r.listCells();
return Bytes.toString(CellUtil.cloneValue(cells.get(0)));
} catch (IOException e) {
throw new RuntimeException("获取表对象失败............");
}finally{
close();
}
}


/*
*
*/
public static Map get(String tableName, String rowKey, String columnFamily, String...qualifiers){
getAdmin();
try{
Table t = con.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
if(qualifiers != null && qualifiers.length != 0){
for(String qualifier : qualifiers){
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
}
}else if(columnFamily != null){
get.addFamily(Bytes.toBytes(columnFamily));
}
Result r = t.get(get);
List cells = r.listCells();
Map results = null;
if(cells != null && cells.size() != 0){
results = new HashMap();
for(Cell cell : cells){
results.put(Bytes.toString(CellUtil.cloneQualifier(cell)),
Bytes.toString(CellUtil.cloneValue(cell)));
}
}
return results;
}catch (IOException e) {
throw new RuntimeException("获取表对象失败............");
}finally{
close();
}
}


/*
*
*/
public static T get(String tableName, String rowKey, String columnFamily, Class clazz){
getAdmin();
try{
Table t = con.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
Field[] fs = clazz.getDeclaredFields();
for(Field f : fs){
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(f.getName()));
}

Result r = t.get(get);
List cells = r.listCells();
T tObj = clazz.newInstance();
if(cells != null && cells.size() != 0){
for(Cell cell : cells){
for(int i = 0; i String valueStr = Bytes.toString(CellUtil.cloneValue(cell));
if(Bytes.toString(CellUtil.cloneQualifier(cell)).intern() == fs[i].getName().intern()){
Object value= null;
if(fs[i].getType().getName().intern() == "int" || fs[i].getType().getName().intern() == "java.lang.Integer"){
value = Integer.parseInt(valueStr);
}else if(fs[i].getType().getName().intern() == "double" || fs[i].getType().getName().intern() == "java.lang.Double"){
value = Double.parseDouble(valueStr);
}
fs[i].setAccessible(true);
fs[i].set(tObj, value);
}
}
}
}
return tObj;
}catch(IOException e){
throw new RuntimeException("获取表对象失败............");
}catch (Exception e) {
throw new RuntimeException("创建【表对象失败............");
}finally{
close();
}
}

}

相关TAG标签
上一篇:Linux之文件与目录管理
下一篇:实验:minikuber上运行简单程序
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站