频道栏目
首页 > 资讯 > 云计算 > 正文

自定义flume导入hbase代码

18-06-23        来源:[db:作者]  
收藏   我要投稿

自定义flume导入hbase代码。

1 拷贝hbase1.2.6下的lib目录的jar文件到flume1.8的lib目录下

2 在eclipse编写解析日志文件的自定义代码

2.1 pom.xml文件内容

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

demo

avdemo

0.0.1-SNAPSHOT

io.netty

netty-all

4.0.36.Final

org.apache.flume.flume-ng-sinks

flume-ng-hbase-sink

1.8.0

org.apache.hadoop

hadoop-hdfs

2.7.3

org.apache.hbase

hbase-client

1.2.6

jdk.tools

jdk.tools

1.8

system

${JAVA_HOME}/lib/tools.jar

2.2 封装日志信息实体类

package demo;

public class AccessLog {

private String clientIp;

private String clientIndentity;

private String remoteUser;

private String dateTime;

private String request;

private String httpStatusCode;

private String bytesSent;

private String referer;

private String userAgent;

 

public String getClientIp() {

return clientIp;

}

 

public void setClientIp(String clientIp) {

this.clientIp = clientIp;

}

 

public String getClientIndentity() {

return clientIndentity;

}

 

public void setClientIndentity(String clientIndentity) {

this.clientIndentity = clientIndentity;

}

 

public String getRemoteUser() {

return remoteUser;

}

 

public void setRemoteUser(String remoteUser) {

this.remoteUser = remoteUser;

}

 

public String getDateTime() {

return dateTime;

}

 

public void setDateTime(String dateTime) {

this.dateTime = dateTime;

}

 

public String getRequest() {

return request;

}

 

public void setRequest(String request) {

this.request = request;

}

 

public String getHttpStatusCode() {

return httpStatusCode;

}

 

public void setHttpStatusCode(String httpStatusCode) {

this.httpStatusCode = httpStatusCode;

}

 

public String getBytesSent() {

return bytesSent;

}

 

public void setBytesSent(String bytesSent) {

this.bytesSent = bytesSent;

}

 

public String getReferer() {

return referer;

}

 

public void setReferer(String referer) {

this.referer = referer;

}

 

public String getUserAgent() {

return userAgent;

}

 

public void setUserAgent(String userAgent) {

this.userAgent = userAgent;

}

}

2.3 生成UUID自定义类

package demo;

import java.util.UUID;

public class UUIDGenerator {

public UUIDGenerator() {

}

/**

* 获得一个UUID

* @return String UUID

*/

 

public static String getUUID(){

String s = UUID.randomUUID().toString();

//去掉“-”符号

return s.substring(0,8)+s.substring(9,13)+s.substring(14,18)+s.substring(19,23)+s.substring(24);

}

/**

* 获得指定数目的UUID

* @param number int 需要获得的UUID数量

* @return String[] UUID数组

*/

 

public static String[] getUUID(int number){

if(number < 1){

return null;

}

String[] ss = new String[number];

for(int i=0;i ss[i] = getUUID();

}

return ss;

}

}

2.4 自定义日志字符串解析封装类

package demo;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

public class AccessLogParser {

/**

* 日志格式

* 11.52.10.49 - - [17/May/2018:11:35:21 +0800] "GET /webapp HTTP/1.1" 302 - "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36"

*/

private static String pattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+)\" (\\d{3}) (\\d+|-) \"([^\"]+)\" \"([^\"]+)\"";

private static Pattern p = Pattern.compile(pattern);

public static AccessLog parse(String line){

//line=11.52.10.49 - - [17/May/2018:11:35:21 +0800] "GET /webapp HTTP/1.1" 302 - "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36"

Matcher matcher = p.matcher(line);

if (matcher.matches()){

AccessLog accessLog = new AccessLog();

//accessLog=com.tcloud.flume.AccessLog@b52dc3

accessLog.setClientIp(matcher.group(1));

//11.52.10.49

accessLog.setClientIndentity(matcher.group(2));

//-

accessLog.setRemoteUser(matcher.group(3));

//-

accessLog.setDateTime(matcher.group(4));

//17/May/2018:11:35:21

accessLog.setRequest(matcher.group(5));

//GET /webapp HTTP/1.1

accessLog.setHttpStatusCode(matcher.group(6));

//302

accessLog.setBytesSent(matcher.group(7));

//-

accessLog.setReferer(matcher.group(8));

//-

accessLog.setUserAgent(matcher.group(9));

//Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36

return accessLog;

}

return null;

}

}

2.5 自定义解析类

package demo;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.ArrayList;

import java.util.List;

import java.util.Locale;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.conf.ComponentConfiguration;

import org.apache.flume.sink.hbase.HbaseEventSerializer;

import org.apache.hadoop.hbase.client.Increment;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Row;

import org.apache.hadoop.hbase.util.Bytes;

public class AsyncHbaseLogEventSerializer implements HbaseEventSerializer {

private byte[] colFam = "cf".getBytes();

private Event currentEvent;

public void initialize(Event event, byte[] colFam) {

// byte[]字节型数组

this.currentEvent = event;

this.colFam = colFam;

}

public void configure(Context context) {

}

public void configure(ComponentConfiguration conf) {

}

public List getActions() {

// Split the event body and get the values for the columns

String eventStr = new String(currentEvent.getBody());

// eventStr=11.52.10.49 - - [17/May/2018:11:35:21 +0800] "GET /webapp HTTP/1.1"

// 302 - "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML,

// like Gecko) Chrome/37.0.2062.120 Safari/537.36"

AccessLog cols = AccessLogParser.parse(eventStr);

// cols=com.tcloud.flume.AccessLog@b52dc3

String req = cols.getRequest();

// req=GET /webapp HTTP/1.1

String reqPath = req.split(" ")[1];

// reqPath=/webapp

int pos = reqPath.indexOf("");

// pos=-1

if (pos > 0) {

reqPath = reqPath.substring(0, pos);

}

// trim()方法返回调用字符串对象的一个副本,但是所有起始和结尾的空格都被删除了,例子如下:String s=" Hello World

// ".trim();就是把"Hello World"放入s中。

if (reqPath.length() > 1 && reqPath.trim().endsWith("/")) {

reqPath = reqPath.substring(0, reqPath.length() - 1);

}

String req_ts_str = cols.getDateTime();

// GetDateTime 得到系统日期和时间

Long currTime = System.currentTimeMillis();

// System.currentTimeMillis() 获得的是自1970-1-01 00:00:00.000 到当前时刻的时间距离,类型为long

String currTimeStr = null;

if (req_ts_str != null && !req_ts_str.equals("")) {

SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);

SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

try {

currTimeStr = df2.format(df.parse(req_ts_str));

currTime = df.parse(req_ts_str).getTime();

} catch (ParseException e) {

System.out.println("parse req time error,using system.current time.");

}

}

long revTs = Long.MAX_VALUE - currTime;

byte[] currentRowKey = (UUIDGenerator.getUUID() + Long.toString(revTs) + reqPath).getBytes();

List puts = new ArrayList();

Put putReq = new Put(currentRowKey);

// putReq={"totalColumns":0,"families":{},"row":"d934e9adf3c540c8b58af1077fe7a0a39223370594393854807/webapp"}

putReq.add(colFam, "clientip".getBytes(), Bytes.toBytes(cols.getClientIp()));

putReq.add(colFam, "clientindentity".getBytes(), Bytes.toBytes(cols.getClientIndentity()));

putReq.add(colFam, "remoteuser".getBytes(), Bytes.toBytes(cols.getRemoteUser()));

putReq.add(colFam, "httpstatuscode".getBytes(), Bytes.toBytes(cols.getHttpStatusCode()));

putReq.add(colFam, "bytessent".getBytes(), Bytes.toBytes(cols.getBytesSent()));

putReq.add(colFam, "request".getBytes(), Bytes.toBytes(cols.getRequest()));

putReq.add(colFam, "referer".getBytes(), Bytes.toBytes(cols.getReferer()));

putReq.add(colFam, "datetime".getBytes(), Bytes.toBytes(currTimeStr));

putReq.add(colFam, "useragent".getBytes(), Bytes.toBytes(cols.getUserAgent()));

puts.add(putReq);

return puts;

}

public List getIncrements() {

List incs = new ArrayList();

return incs;

}

public void close() {

colFam = null;

currentEvent = null;

}

}

3 将代码导出为jar并,将该jar拷贝到flume到lib目录下

4 在flume的conf目录下创建 v2 配置文件

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/hadoop/data.txt

a1.sources.r1.port = 44444

a1.sources.r1.host = 192.168.8.71

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type = logger

a1.sinks.k1.type = hbase

a1.sinks.k1.table = access_log

a1.sinks.k1.columnFamily = cf

a1.sinks.k1.serializer = demo.AsyncHbaseLogEventSerializer

a1.sinks.k1.channel = memoryChannel

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

5 在conf目录下启动flume命令

flume-ng agent -c . -f v2 -n a1 -Dflume.root.logger=INFO,console

6 在hbase中创建表

hbase>create 'access_log','cf'

7 在当前节点的/home/hadoop/目录上创建输入文件

echo "11.52.10.80 - - [17/Sep/2018:11:35:21 +0800] \"GET /webapp HTTP/1.1\" 302 - \"-\" \"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36\"" >> data.txt。

相关TAG标签
上一篇:Centos7下Yum安装PHP5.5,5.6,7.0的步骤教程
下一篇:Linux系统下虚拟机的封装教程
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站