频道栏目
首页 > 资讯 > 云计算 > 正文

Flume+Hadoop+Hive的离线分析系统基本架构

16-05-31        来源:[db:作者]  
收藏   我要投稿

最近在学习大数据的离线分析技术,所以在这里通过做一个简单的网站点击流数据分析离线系统来和大家一起梳理一下离线分析系统的架构模型。当然这个架构模型只能是离线分析技术的一个简单的入门级架构,实际生产环境中的大数据离线分析技术还涉及到很多细节的处理和高可用的架构。这篇文章的目的只是带大家入个门,让大家对离线分析技术有一个简单的认识,并和大家一起做学习交流。
 
离线分析系统的结构图


整个离线分析的总体架构就是使用Flume从FTP服务器上采集日志文件,并存储在Hadoop HDFS文件系统上,再接着用Hadoop的mapreduce清洗日志文件,最后使用HIVE构建数据仓库做离线分析。任务的调度使用Shell脚本完成,当然大家也可以尝试一些自动化的任务调度工具,比如说AZKABAN或者OOZIE等。
分析所使用的点击流日志文件主要来自Nginx的access.log日志文件,需要注意的是在这里并不是用Flume直接去生产环境上拉取nginx的日志文件,而是多设置了一层FTP服务器来缓冲所有的日志文件,然后再用Flume监听FTP服务器上指定的目录并拉取目录里的日志文件到HDFS服务器上(具体原因下面分析)。从生产环境推送日志文件到FTP服务器的操作可以通过Shell脚本配合Crontab定时器来实现。

网站点击流数据

 
一般在WEB系统中,用户对站点的页面的访问浏览,点击行为等一系列的数据都会记录在日志中,每一条日志记录就代表着上图中的一个数据点;而点击流数据关注的就是所有这些点连起来后的一个完整的网站浏览行为记录,可以认为是一个用户对网站的浏览session。比如说用户从哪一个外站进入到当前的网站,用户接下来浏览了当前网站的哪些页面,点击了哪些图片链接按钮等一系列的行为记录,这一个整体的信息就称为是该用户的点击流记录。这篇文章中设计的离线分析系统就是收集WEB系统中产生的这些数据日志,并清洗日志内容存储分布式的HDFS文件存储系统上,接着使用离线分析工具HIVE去统计所有用户的点击流信息。
 
本系统中我们采用Nginx的access.log来做点击流分析的日志文件。access.log日志文件的格式如下:
样例数据格式:
124.42.13.230 - - [18/Sep/2013:06:57:50 +0000] "GET /shoppingMall?ver=1.2.1 HTTP/1.1" 200 7200 "http://www.baidu.com.cn" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170; InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)"
格式分析:
1、 访客ip地址:124.42.13.230
2、访客用户信息:- -
3、请求时间:[18/Sep/2013:06:57:50 +0000]
4、请求方式:GET
5、请求的url:/shoppingMall?ver=1.10.2
6、请求所用协议:HTTP/1.1
7、响应码:200
8、返回的数据流量:7200
9、访客的来源url:http://www.baidu.com.cn
10、访客所用浏览器:Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170; InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)

收集用户数据
网站会通过前端JS代码或服务器端的后台代码收集用户浏览数据并存储在网站服务器中。一般运维人员会在离线分析系统和真实生产环境之间部署FTP服务器,并将生产环境上的用户数据每天定时发送到FTP服务器上,离线分析系统就会从FTP服务上采集数据而不会影响到生产环境。
采集数据的方式有多种,一种是通过自己编写shell脚本或Java编程采集数据,但是工作量大,不方便维护,另一种就是直接使用第三方框架去进行日志的采集,一般第三方框架的健壮性,容错性和易用性都做得很好也易于维护。本文采用第三方框架Flume进行日志采集,Flume是一个分布式的高效的日志采集系统,它能把分布在不同服务器上的海量日志文件数据统一收集到一个集中的存储资源中,Flume是Apache的一个顶级项目,与Hadoop也有很好的兼容性。不过需要注意的是Flume并不是一个高可用的框架,这方面的优化得用户自己去维护。
Flume的agent是运行在JVM上的,所以各个服务器上的JVM环境必不可少。每一个Flume agent部署在一台服务器上,Flume会收集web server 产生的日志数据,并封装成一个个的事件发送给Flume Agent的Source,Flume Agent Source会消费这些收集来的数据事件并放在Flume Agent Channel,Flume Agent Sink会从Channel中收集这些采集过来的数据,要么存储在本地的文件系统中要么作为一个消费资源分发给下一个装在分布式系统中其它服务器上的Flume进行处理。Flume提供了点对点的高可用的保障,某个服务器上的Flume Agent Channel中的数据只有确保传输到了另一个服务器上的Flume Agent Channel里或者正确保存到了本地的文件存储系统中,才会被移除。
本系统中每一个FTP服务器以及Hadoop的name node服务器上都要部署一个Flume Agent;FTP的Flume Agent采集Web Server的日志并汇总到name node服务器上的Flume Agent,最后由hadoop name node服务器将所有的日志数据下沉到分布式的文件存储系统HDFS上面。
需要注意的是Flume的Source在本文的系统中选择的是Spooling Directory Source,而没有选择Exec Source,因为当Flume服务down掉的时候Spooling Directory Source能记录上一次读取到的位置,而Exec Source则没有,需要用户自己去处理,当重启Flume服务器的时候如果处理不好就会有重复数据的问题。当然Spooling Directory Source也是有缺点的,会对读取过的文件重命名,所以多架一层FTP服务器也是为了避免Flume“污染”生产环境。Spooling Directory Source另外一个比较大的缺点就是无法做到灵活监听某个文件夹底下所有子文件夹里的所有文件里新追加的内容。关于这些问题的解决方案也有很多,比如选择其它的日志采集工具,像logstash等。
FTP服务器上的Flume配置文件如下:
agent.channels = memorychannel
agent.sinks = target

agent.sources.origin.type = spooldir
agent.sources.origin.spoolDir = /export/data/trivial/weblogs
agent.sources.origin.channels = memorychannel
agent.sources.origin.deserializer.maxLineLength = 2048

agent.sources.origin.interceptors = i2
agent.sources.origin.interceptors.i2.type = host
agent.sources.origin.interceptors.i2.hostHeader = hostname

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel

agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 10000

agent.sinks.target.type = avro
agent.sinks.target.channel = memorychannel
agent.sinks.target.hostname = 172.16.124.130
agent.sinks.target.port = 4545
这里有几个参数需要说明,Flume Agent Source可以通过配置deserializer.maxLineLength这个属性来指定每个Event的大小,默认是每个Event是2048个byte。Flume Agent Channel的大小默认等于于本地服务器上JVM所获取到的内存的80%,用户可以通过byteCapacityBufferPercentage和byteCapacity两个参数去进行优化。
需要特别注意的是FTP上放入Flume监听的文件夹中的日志文件不能同名,不然Flume会报错并停止工作,最好的解决方案就是为每份日志文件拼上时间戳。

在Hadoop服务器上的配置文件如下:
agent.sources = origin
agent.channels = memorychannel
agent.sinks = target

agent.sources.origin.type = avro
agent.sources.origin.channels = memorychannel
agent.sources.origin.bind = 0.0.0.0
agent.sources.origin.port = 4545

#agent.sources.origin.interceptors = i1 i2
#agent.sources.origin.interceptors.i1.type = timestamp
#agent.sources.origin.interceptors.i2.type = host
#agent.sources.origin.interceptors.i2.hostHeader = hostname

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel

agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 5000000
agent.channels.memorychannel.transactionCapacity = 1000000

agent.sinks.target.type = hdfs
agent.sinks.target.channel = memorychannel
agent.sinks.target.hdfs.path = /flume/events/%y-%m-%d/%H%M%S
agent.sinks.target.hdfs.filePrefix = data-%{hostname}
agent.sinks.target.hdfs.rollInterval = 60
agent.sinks.target.hdfs.rollSize = 1073741824
agent.sinks.target.hdfs.rollCount = 1000000
agent.sinks.target.hdfs.round = true
agent.sinks.target.hdfs.roundValue = 10
agent.sinks.target.hdfs.roundUnit = minute
agent.sinks.target.hdfs.useLocalTimeStamp = true
agent.sinks.target.hdfs.minBlockReplicas=1
agent.sinks.target.hdfs.writeFormat=Text
agent.sinks.target.hdfs.fileType=DataStream

round, roundValue,roundUnit三个参数是用来配置每10分钟在hdfs里生成一个文件夹保存从FTP服务器上拉取下来的数据。

Troubleshooting
使用Flume拉取文件到HDFS中会遇到将文件分散成多个1KB-5KB的小文件的问题
需要注意的是如果遇到Flume会将拉取过来的文件分成很多份1KB-5KB的小文件存储到HDFS上,那么很可能是HDFS Sink的配置不正确,导致系统使用了默认配置。spooldir类型的source是将指定目录中的文件的每一行封装成一个event放入到channel中,默认每一行最大读取1024个字符。在HDFS Sink端主要是通过rollInterval(默认30秒), rollSize(默认1KB), rollCount(默认10个event)3个属性来决定写进HDFS的分片文件的大小。rollInterval表示经过多少秒后就将当前.tmp文件(写入的是从channel中过来的events)下沉到HDFS文件系统中,rollSize表示一旦.tmp文件达到一定的size后,就下沉到HDFS文件系统中,rollCount表示.tmp文件一旦写入了指定数量的events就下沉到HDFS文件系统中。
 
使用Flume拉取到HDFS中的文件格式错乱
这是因为HDFS Sink的配置中,hdfs.writeFormat属性默认为“Writable”会将原先的文件的内容序列化成HDFS的格式,应该手动设置成hdfs.writeFormat=“text”; 并且hdfs.fileType默认是“SequenceFile”类型的,是将所有event拼成一行,应该该手动设置成hdfs.fileType=“DataStream”,这样就可以是一行一个event,与原文件格式保持一致

使用Mapreduce清洗日志文件
当把日志文件中的数据拉取到HDFS文件系统后,使用Mapreduce程序去进行日志清洗
第一步,先用Mapreduce过滤掉无效的数据
package com.guludada.clickstream;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.guludada.dataparser.WebLogParser;


public class logClean  {
	
	public static class cleanMap extends Mapper {
		
		private NullWritable v = NullWritable.get();
		private Text word = new Text();
		WebLogParser webLogParser = new WebLogParser();
		
		public void map(Object key,Text value,Context context) {
					
			//将一行内容转成string
			 String line = value.toString();
			
			 String cleanContent = webLogParser.parser(line);
			
			 if(cleanContent != "") { 		
				word.set(cleanContent);
				try {
					context.write(word,v);
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}			
		}
	}
	
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
	
		conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000");
		
		Job job = Job.getInstance(conf);
				
		job.setJarByClass(logClean.class);
		
		//指定本业务job要使用的mapper/Reducer业务类
		job.setMapperClass(cleanMap.class);
				
		//指定mapper输出数据的kv类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
			
		//指定job的输入原始文件所在目录
		Date curDate = new Date();
		SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd");
		String dateStr = sdf.format(curDate);
		FileInputFormat.setInputPaths(job, new Path("/flume/events/" + dateStr + "/*/*"));

		//指定job的输出结果所在目录
		FileOutputFormat.setOutputPath(job, new Path("/clickstream/cleandata/"+dateStr+"/"));
		
		//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
		boolean res = job.waitForCompletion(true);
		System.exit(res?0:1);
		
	 }

}

package com.guludada.dataparser;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.guludada.javabean.WebLogBean;
/**
*   用正则表达式匹配出合法的日志记录
*
*
*/
public class WebLogParser {
	
   public String parser(String weblog_origin) {
		
	 WebLogBean weblogbean = new WebLogBean();
		
        // 获取IP地址
	Pattern IPPattern = Pattern.compile("\\d+.\\d+.\\d+.\\d+");
        Matcher IPMatcher = IPPattern.matcher(weblog_origin);
	if(IPMatcher.find()) {
	   String IPAddr = IPMatcher.group(0);
	   weblogbean.setIP_addr(IPAddr);
        } else {
	   return ""
        }
	 // 获取时间信息
	 Pattern TimePattern = Pattern.compile("\\[(.+)\\]");
	 Matcher TimeMatcher = TimePattern.matcher(weblog_origin);
	 if(TimeMatcher.find()) {
	   String time = TimeMatcher.group(1);
	   String[] cleanTime = time.split(" ");
	   weblogbean.setTime(cleanTime[0]);
	 } else {
	   return "";
	 }
		
        //获取其余请求信息
	 Pattern InfoPattern = Pattern.compile(
		 "(\\\"[POST|GET].+?\\\") (\\d+) (\\d+).+?(\\\".+?\\\") (\\\".+?\\\")");

	 Matcher InfoMatcher = InfoPattern.matcher(weblog_origin);
	 if(InfoMatcher.find()) {
			
	   String requestInfo = InfoMatcher.group(1).replace('\"',' ').trim();
	   String[] requestInfoArry = requestInfo.split(" ");
	   weblogbean.setMethod(requestInfoArry[0]);
	   weblogbean.setRequest_URL(requestInfoArry[1]);
	   weblogbean.setRequest_protocol(requestInfoArry[2]);
	   String status_code = InfoMatcher.group(2);
	   weblogbean.setRespond_code(status_code);
			
	   String respond_data = InfoMatcher.group(3);
	   weblogbean.setRespond_data(respond_data);
			
	   String request_come_from = InfoMatcher.group(4).replace('\"',' ').trim();
	   weblogbean.setRequst_come_from(request_come_from);
			
	   String browserInfo = InfoMatcher.group(5).replace('\"',' ').trim();
	   weblogbean.setBrowser(browserInfo);
	} else {
	   return "";
	}
		
        return weblogbean.toString();
   }
		
}
package com.guludada.javabean;

public class WebLogBean {
	
	String IP_addr;
	String time;
	String method;
	String request_URL;
	String request_protocol;
	String respond_code;
	String respond_data;
	String requst_come_from;
	String browser;
	public String getIP_addr() {
		return IP_addr;
	}
	public void setIP_addr(String iP_addr) {
		IP_addr = iP_addr;
	}
	public String getTime() {
		return time;
	}
	public void setTime(String time) {
		this.time = time;
	}
	public String getMethod() {
		return method;
	}
	public void setMethod(String method) {
		this.method = method;
	}
	public String getRequest_URL() {
		return request_URL;
	}
	public void setRequest_URL(String request_URL) {
		this.request_URL = request_URL;
	}
	public String getRequest_protocol() {
		return request_protocol;
	}
	public void setRequest_protocol(String request_protocol) {
		this.request_protocol = request_protocol;
	}
	public String getRespond_code() {
		return respond_code;
	}
	public void setRespond_code(String respond_code) {
		this.respond_code = respond_code;
	}
	public String getRespond_data() {
		return respond_data;
	}
	public void setRespond_data(String respond_data) {
		this.respond_data = respond_data;
	}
	public String getRequst_come_from() {
		return requst_come_from;
	}
	public void setRequst_come_from(String requst_come_from) {
		this.requst_come_from = requst_come_from;
	}
	public String getBrowser() {
		return browser;
	}
	public void setBrowser(String browser) {
		this.browser = browser;
	}
	@Override
	public String toString() {
		return IP_addr + " " + time + " " + method + " "
				+ request_URL + " " + request_protocol + " " + respond_code
				+ " " + respond_data + " " + requst_come_from + " " + browser;
	}
	
	
}

第一次日记清洗后的记录如下图:


 
步,根据访问记录生成相应的Session信息记录,假设Session的过期时间是30分钟

package com.guludada.clickstream;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.UUID;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.guludada.clickstream.logClean.cleanMap;
import com.guludada.dataparser.SessionParser;
import com.guludada.dataparser.WebLogParser;
import com.guludada.javabean.WebLogSessionBean;

public class logSession {
	
	public static class sessionMapper extends Mapper {
		
		private Text IPAddr = new Text();
		private Text content = new Text();
		private NullWritable v = NullWritable.get();
		WebLogParser webLogParser = new WebLogParser();
		
		public void map(Object key,Text value,Context context) {
						
			//将一行内容转成string
			String line = value.toString();
			
			String[] weblogArry = line.split(" ");
				
			IPAddr.set(weblogArry[0]);
			content.set(line);
			try {
				context.write(IPAddr,content);
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}			
		}
	}

	static class sessionReducer extends Reducer{
		
		private Text IPAddr = new Text();
		private Text content = new Text();
		private NullWritable v = NullWritable.get();
		WebLogParser webLogParser = new WebLogParser();
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		SessionParser sessionParser = new SessionParser();
		
		@Override
		protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
			
			Date sessionStartTime = null;
			String sessionID = UUID.randomUUID().toString();
			
						
			//将IP地址所对应的用户的所有浏览记录按时间排序
			ArrayList sessionBeanGroup  = new ArrayList();
			for(Text browseHistory : values) {
				WebLogSessionBean sessionBean = sessionParser.loadBean(browseHistory.toString());
				sessionBeanGroup.add(sessionBean);
			}
			Collections.sort(sessionBeanGroup,new Comparator() {

				public int compare(WebLogSessionBean sessionBean1, WebLogSessionBean sessionBean2) {					
					Date date1 = sessionBean1.getTimeWithDateFormat();
					Date date2 = sessionBean2.getTimeWithDateFormat();
					if(date1 == null && date2 == null) return 0;
					return date1.compareTo(date2);
				}
			});
			
			for(WebLogSessionBean sessionBean : sessionBeanGroup) {
				
				if(sessionStartTime == null) {
					//当天日志中某用户第一次访问网站的时间
					sessionStartTime = timeTransform(sessionBean.getTime());
					content.set(sessionParser.parser(sessionBean, sessionID));
					try {
						context.write(content,v);
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					
				} else {
					
					Date sessionEndTime = timeTransform(sessionBean.getTime());
					long sessionStayTime = timeDiffer(sessionStartTime,sessionEndTime);
					if(sessionStayTime > 30 * 60 * 1000) {						
						//将当前浏览记录的时间设为下一个session的开始时间
						sessionStartTime = timeTransform(sessionBean.getTime());
						sessionID = UUID.randomUUID().toString();												
						continue;
					} 
					content.set(sessionParser.parser(sessionBean, sessionID));						
					try {
						context.write(content,v);
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}				
			}
		}
		
		private Date timeTransform(String time) {
			
			Date standard_time = null;
			try {
				standard_time = sdf.parse(time);
			} catch (ParseException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			return standard_time;
		}
		
		private long timeDiffer(Date start_time,Date end_time) {
			
			long diffTime = 0;
			diffTime = end_time.getTime() - start_time.getTime();
			
			return diffTime;
		}
		
	}
	
	
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
	
		conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000");
						
		Job job = Job.getInstance(conf);
				
		job.setJarByClass(logClean.class);
		
		//指定本业务job要使用的mapper/Reducer业务类
		job.setMapperClass(sessionMapper.class);
		job.setReducerClass(sessionReducer.class);
				
		//指定mapper输出数据的kv类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		//指定最终输出的数据的kv类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		
		Date curDate = new Date();
		SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd");
		String dateStr = sdf.format(curDate);
		
		//指定job的输入原始文件所在目录
		FileInputFormat.setInputPaths(job, new Path("/clickstream/cleandata/"+dateStr+"/*"));
		//指定job的输出结果所在目录
		FileOutputFormat.setOutputPath(job, new Path("/clickstream/sessiondata/"+dateStr+"/"));
		
		//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
		
		boolean res = job.waitForCompletion(true);
		System.exit(res?0:1);
		
	}
}
package com.guludada.dataparser;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

import com.guludada.javabean.WebLogSessionBean;

public class SessionParser {
	
	SimpleDateFormat sdf_origin = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.ENGLISH);
	SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	
	public String parser(WebLogSessionBean sessionBean,String sessionID) {
				
		sessionBean.setSession(sessionID);		
		return sessionBean.toString();
	}
	
	public WebLogSessionBean loadBean(String sessionContent) {
		
		WebLogSessionBean weblogSession = new WebLogSessionBean();
			
		String[] contents = sessionContent.split(" ");
		weblogSession.setTime(timeTransform(contents[1]));
		weblogSession.setIP_addr(contents[0]);
		weblogSession.setRequest_URL(contents[3]);
		weblogSession.setReferal(contents[7]);
			
		return weblogSession;
	}
	
	private String timeTransform(String time) {
				
		Date standard_time = null;
		try {
			standard_time = sdf_origin.parse(time);
		} catch (ParseException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return sdf_final.format(standard_time);
	}
}
package com.guludada.javabean;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class WebLogSessionBean {
	
	String time;
	String IP_addr;
	String session;
	String request_URL;
	String referal;
	
	
	public String getTime() {
		return time;
	}
	public void setTime(String time) {
		this.time = time;
	}
	public String getIP_addr() {
		return IP_addr;
	}
	public void setIP_addr(String iP_addr) {
		IP_addr = iP_addr;
	}
	public String getSession() {
		return session;
	}
	public void setSession(String session) {
		this.session = session;
	}
	public String getRequest_URL() {
		return request_URL;
	}
	public void setRequest_URL(String request_URL) {
		this.request_URL = request_URL;
	}
	public String getReferal() {
		return referal;
	}
	public void setReferal(String referal) {
		this.referal = referal;
	}
	
	public Date getTimeWithDateFormat() {
		
		SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		if(this.time != null && this.time != "") {
			try {
				return sdf_final.parse(this.time);
			} catch (ParseException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return null;
	}
	
	@Override
	public String toString() {
		return time + " " + IP_addr + " " + session + " "
				+ request_URL + " " + referal;
	}
	
	
	
}

第二次清理出来的Session信息结构如下:
时间 IP SessionID 请求页面URL Referal URL
2015-05-30 19:38:00 192.168.12.130 Session1 /blog/me www.baidu.com
2015-05-30 19:39:00 192.168.12.130 Session1 /blog/me/details www.mysite.com/blog/me
2015-05-30 19:38:00 192.168.12.40 Session2 /blog/me www.baidu.com



第三步,清洗第二步生成的Session信息,生成PageViews信息表
package com.guludada.clickstream;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.guludada.clickstream.logClean.cleanMap;
import com.guludada.clickstream.logSession.sessionMapper;
import com.guludada.clickstream.logSession.sessionReducer;
import com.guludada.dataparser.PageViewsParser;
import com.guludada.dataparser.SessionParser;
import com.guludada.dataparser.WebLogParser;
import com.guludada.javabean.PageViewsBean;
import com.guludada.javabean.WebLogSessionBean;

public class PageViews {
	
	public static class pageMapper extends Mapper {
			
			private Text word = new Text();
			
			public void map(Object key,Text value,Context context) {
								
				String line = value.toString();
				String[] webLogContents = line.split(" ");
				
				//根据session来分组
				word.set(webLogContents[2]);
					try {
						context.write(word,value);
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}						
			}
		}

	public static class pageReducer extends Reducer{
	
		private Text session = new Text();
		private Text content = new Text();
		private NullWritable v = NullWritable.get();
		PageViewsParser pageViewsParser = new PageViewsParser();
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		//上一条记录的访问信息
		PageViewsBean lastStayPageBean = null;
		Date lastVisitTime = null;
	
		@Override
		protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
			
			//将session所对应的所有浏览记录按时间排序
			ArrayList pageViewsBeanGroup  = new ArrayList();
			for(Text pageView : values) {
				PageViewsBean pageViewsBean = pageViewsParser.loadBean(pageView.toString());
				pageViewsBeanGroup.add(pageViewsBean);
			}
			Collections.sort(pageViewsBeanGroup,new Comparator() {

				public int compare(PageViewsBean pageViewsBean1, PageViewsBean pageViewsBean2) {					
					Date date1 = pageViewsBean1.getTimeWithDateFormat();
					Date date2 = pageViewsBean2.getTimeWithDateFormat();
					if(date1 == null && date2 == null) return 0;
					return date1.compareTo(date2);
				}
			});
			
			//计算每个页面的停留时间
			int step = 0;
			for(PageViewsBean pageViewsBean : pageViewsBeanGroup) {
				
				Date curVisitTime = pageViewsBean.getTimeWithDateFormat();
				
				if(lastStayPageBean != null) {	
					//计算前后两次访问记录相差的时间,单位是秒
					Integer timeDiff = (int) ((curVisitTime.getTime() - lastVisitTime.getTime())/1000);						
					//根据当前记录的访问信息更新上一条访问记录中访问的页面的停留时间
					lastStayPageBean.setStayTime(timeDiff.toString());
				}
				
				//更新访问记录的步数
				step++;
				pageViewsBean.setStep(step+"");
				//更新上一条访问记录的停留时间后,将当前访问记录设定为上一条访问信息记录
				lastStayPageBean = pageViewsBean;
				lastVisitTime = curVisitTime;	
				
				//输出pageViews信息
				content.set(pageViewsParser.parser(pageViewsBean));						
				try {
					context.write(content,v);
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}			
	}	
}
	
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		
		conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000");
						
		Job job = Job.getInstance(conf);
				
		job.setJarByClass(PageViews.class);
		
		//指定本业务job要使用的mapper/Reducer业务类
		job.setMapperClass(pageMapper.class);
		job.setReducerClass(pageReducer.class);
				
		//指定mapper输出数据的kv类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		//指定最终输出的数据的kv类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		
		Date curDate = new Date();
		SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd");
		String dateStr = sdf.format(curDate);
		
		//指定job的输入原始文件所在目录
		FileInputFormat.setInputPaths(job, new Path("/clickstream/sessiondata/"+dateStr+"/*"));
		//指定job的输出结果所在目录
		FileOutputFormat.setOutputPath(job, new Path("/clickstream/pageviews/"+dateStr+"/"));
		
		//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
		
		boolean res = job.waitForCompletion(true);
		System.exit(res?0:1);
		
	}
}
package com.guludada.dataparser;

import com.guludada.javabean.PageViewsBean;
import com.guludada.javabean.WebLogSessionBean;

public class PageViewsParser {
	/**
	 * 根据logSession的输出数据加载PageViewsBean
	 * 
	 * */
	public PageViewsBean loadBean(String sessionContent) {
		
		PageViewsBean pageViewsBean = new PageViewsBean();
			
		String[] contents = sessionContent.split(" ");
		pageViewsBean.setTime(contents[0] + " " + contents[1]);
		pageViewsBean.setIP_addr(contents[2]);
		pageViewsBean.setSession(contents[3]);
		pageViewsBean.setVisit_URL(contents[4]);
		pageViewsBean.setStayTime("0");
		pageViewsBean.setStep("0");
					
		return pageViewsBean;
	}
	
	public String parser(PageViewsBean pageBean) {
		
		return pageBean.toString();
	}

}
package com.guludada.javabean;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class PageViewsBean {
	
	String session;
	String IP_addr;
	String time;
	String visit_URL;
	String stayTime;
	String step;
	public String getSession() {
		return session;
	}
	public void setSession(String session) {
		this.session = session;
	}
	public String getIP_addr() {
		return IP_addr;
	}
	public void setIP_addr(String iP_addr) {
		IP_addr = iP_addr;
	}
	public String getTime() {
		return time;
	}
	public void setTime(String time) {
		this.time = time;
	}
	public String getVisit_URL() {
		return visit_URL;
	}
	public void setVisit_URL(String visit_URL) {
		this.visit_URL = visit_URL;
	}
	public String getStayTime() {
		return stayTime;
	}
	public void setStayTime(String stayTime) {
		this.stayTime = stayTime;
	}
	public String getStep() {
		return step;
	}
	public void setStep(String step) {
		this.step = step;
	}
	
	public Date getTimeWithDateFormat() {
		
		SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		if(this.time != null && this.time != "") {
			try {
				return sdf_final.parse(this.time);
			} catch (ParseException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return null;
	}
	
	@Override
	public String toString() {
		return session + " " + IP_addr + " " + time + " "
				+ visit_URL + " " + stayTime + " " + step;
	}
		
}
第三次日志清洗产生的PageViews数据结构如下图:
SessionID IP 访问时间 访问页面 停留时间 第几步
Session1 192.168.12.130 2016-05-30 15:17:30 /blog/me 30000 1
Session1 192.168.12.130 2016-05-30 15:18:00 /blog/me/admin 30000 2
Session1 192.168.12.130 2016-05-30 15:18:30 /home 30000 3
Session2 192.168.12.150 2016-05-30 15:16:30 /products 30000 1
Session2 192.168.12.150 2016-05-30 15:17:00 /products/details 30000 2



第四步,再次清洗Session日志,并生成Visits信息表
package com.guludada.clickstream;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.guludada.clickstream.PageViews.pageMapper;
import com.guludada.clickstream.PageViews.pageReducer;
import com.guludada.clickstream.logClean.cleanMap;
import com.guludada.dataparser.PageViewsParser;
import com.guludada.dataparser.VisitsInfoParser;
import com.guludada.javabean.PageViewsBean;

public class VisitsInfo {
	
	public static class visitMapper extends Mapper {
		
		private Text word = new Text();
		
		public void map(Object key,Text value,Context context) {
							
			String line = value.toString();
			String[] webLogContents = line.split(" ");
			
			//根据session来分组
			word.set(webLogContents[2]);
				try {
					context.write(word,value);
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}						
		}
	}

	public static class visitReducer extends Reducer{
	
		private Text content = new Text();
		private NullWritable v = NullWritable.get();
		VisitsInfoParser visitsParser = new VisitsInfoParser();
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		PageViewsParser pageViewsParser = new PageViewsParser();
		Map viewedPagesMap = new HashMap();
		
		String entry_URL = "";
		String leave_URL = "";
		int total_visit_pages = 0;
	
		@Override
		protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
			
			//将session所对应的所有浏览记录按时间排序
			ArrayList browseInfoGroup  = new ArrayList();
			for(Text browseInfo : values) {
				browseInfoGroup.add(browseInfo.toString());
			}
			Collections.sort(browseInfoGroup,new Comparator() {
				
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
				public int compare(String browseInfo1, String browseInfo2) {
					String dateStr1 = browseInfo1.split(" ")[0] + " " + browseInfo1.split(" ")[1];
					String dateStr2 = browseInfo2.split(" ")[0] + " " + browseInfo2.split(" ")[1];
					Date date1;
					Date date2;
					try {
						date1 = sdf.parse(dateStr1);
						date2 = sdf.parse(dateStr2);					 
						if(date1 == null && date2 == null) return 0;
						return date1.compareTo(date2);
					} catch (ParseException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
						return 0;
					}
				}
			});
			
			//统计该session访问的总页面数,第一次进入的页面,跳出的页面			
			for(String browseInfo : browseInfoGroup) {
				
				String[] browseInfoStrArr = browseInfo.split(" ");				
				String curVisitURL = browseInfoStrArr[3];
				Integer curVisitURLInteger = viewedPagesMap.get(curVisitURL);
				if(curVisitURLInteger == null) {
					viewedPagesMap.put(curVisitURL, 1);
				}	
			}
			total_visit_pages = viewedPagesMap.size();
			String visitsInfo = visitsParser.parser(browseInfoGroup, total_visit_pages+"");	
			content.set(visitsInfo);
			try {
				context.write(content,v);
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}	
	}
	
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		
		conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000");
						
		Job job = Job.getInstance(conf);
						
		job.setJarByClass(VisitsInfo.class);
		
		//指定本业务job要使用的mapper/Reducer业务类
		job.setMapperClass(visitMapper.class);
		job.setReducerClass(visitReducer.class);
				
		//指定mapper输出数据的kv类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		//指定最终输出的数据的kv类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		
		Date curDate = new Date();
		SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd");
		String dateStr = sdf.format(curDate);
		
		//指定job的输入原始文件所在目录
		FileInputFormat.setInputPaths(job, new Path("/clickstream/sessiondata/"+dateStr+"/*"));
		//指定job的输出结果所在目录
		FileOutputFormat.setOutputPath(job, new Path("/clickstream/visitsinfo"+dateStr+"/"));
		
		//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
		
		boolean res = job.waitForCompletion(true);
		System.exit(res?0:1);
		
	}
}

package com.guludada.dataparser;

import java.util.ArrayList;

import com.guludada.javabean.PageViewsBean;
import com.guludada.javabean.VisitsInfoBean;
import com.guludada.javabean.WebLogSessionBean;

public class VisitsInfoParser {
	
	public String parser(ArrayList pageViewsGroup,String totalVisitNum) {
		
		VisitsInfoBean visitsBean = new VisitsInfoBean();
		String entryPage = pageViewsGroup.get(0).split(" ")[4];
		String leavePage = pageViewsGroup.get(pageViewsGroup.size()-1).split(" ")[4];
		String startTime = pageViewsGroup.get(0).split(" ")[0] + " " + pageViewsGroup.get(0).split(" ")[1];
		String endTime = pageViewsGroup.get(pageViewsGroup.size()-1).split(" ")[0] + 
					" " +pageViewsGroup.get(pageViewsGroup.size()-1).split(" ")[1];
		String session = pageViewsGroup.get(0).split(" ")[3];
		String IP = pageViewsGroup.get(0).split(" ")[2];
		String referal = pageViewsGroup.get(0).split(" ")[5];
		
		visitsBean.setSession(session);
		visitsBean.setStart_time(startTime);
		visitsBean.setEnd_time(endTime);
		visitsBean.setEntry_page(entryPage);
		visitsBean.setLeave_page(leavePage);
		visitsBean.setVisit_page_num(totalVisitNum);
		visitsBean.setIP_addr(IP);
		visitsBean.setReferal(referal);
		
		return visitsBean.toString();
	}
}

package com.guludada.javabean;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class VisitsInfoBean {
	
	String session;
	String start_time;
	String end_time;
	String entry_page;
	String leave_page;
	String visit_page_num;
	String IP_addr;
	String referal;
	
	public String getSession() {
		return session;
	}
	public void setSession(String session) {
		this.session = session;
	}
	public String getStart_time() {
		return start_time;
	}
	public void setStart_time(String start_time) {
		this.start_time = start_time;
	}
	public String getEnd_time() {
		return end_time;
	}
	public void setEnd_time(String end_time) {
		this.end_time = end_time;
	}
	public String getEntry_page() {
		return entry_page;
	}
	public void setEntry_page(String entry_page) {
		this.entry_page = entry_page;
	}
	public String getLeave_page() {
		return leave_page;
	}
	public void setLeave_page(String leave_page) {
		this.leave_page = leave_page;
	}
	public String getVisit_page_num() {
		return visit_page_num;
	}
	public void setVisit_page_num(String visit_page_num) {
		this.visit_page_num = visit_page_num;
	}
	public String getIP_addr() {
		return IP_addr;
	}
	public void setIP_addr(String iP_addr) {
		IP_addr = iP_addr;
	}
	public String getReferal() {
		return referal;
	}
	public void setReferal(String referal) {
		this.referal = referal;
	}
	
	@Override
	public String toString() {
		return session + " " + start_time + " " + end_time
				+ " " + entry_page + " " + leave_page + " " + visit_page_num
				+ " " + IP_addr + " " + referal;
	}
	
	
		
}

第四次清洗日志产生的访问记录表结构如下图:
SessionID 访问时间 离开时间 第一次访问页面 最后一次访问的页面

访问的页面总数
IP

Referal
Session1 2016-05-30 15:17:00 2016-05-30 15:19:00 /blog/me /blog/others 5 192.168.12.130 www.baidu.com
Session2 2016-05-30 14:17:00 2016-05-30 15:19:38 /home /profile 10 192.168.12.140 www.178.com
Session3 2016-05-30 12:17:00 2016-05-30 15:40:00 /products /detail 6 192.168.12.150 www.78dm.net


 
以上就是要进行日志清洗的所有MapReduce程序,因为只是一个简单的演示,方法并没有做很好的抽象。

MapReduce Troubleshooting
指定某个文件夹路径下所有文件作为mapreduce的输入参数的解决方案。
1.hdfs的文件系统中的路径是支持正则表达式的
2.使用.setInputDirRecursive(job,true)方法,然后指定文件夹路径

在分布式环境下如何设置每个用户的SessionID
可以使用UUID,UUID是分布式环境下唯一的元素识别码,它由日期和时间,时钟序列,机器识别码(一般为网卡MAC地址)三部分组成。这样就保证了每个用户的SessionID的唯一性。


HIVE建立数据仓库
使用MapReduce清洗完日志文件后,我们就开始使用Hive去构建对应的数据仓库并使用HiveSql对数据进行分析。而在本系统里,我们将使用星型模型来构建数据仓库的ODS(OperationalData Store)层。下面的命令我们可以通过启动Hive的hiveserver2服务器并使用beeline客户端进行操作或者直接写脚本去定时调度。

PageViews数据分析

PageViews的事实表和维度表结构


使用HIVE在数据仓库中创建PageViews的贴源数据表:
>> create table pageviews(session string,ip string,requestdate string,requesttime string,visitpage string, staytime string,step string) comment ‘this is the table for pageviews’ partitioned by(inputDate string) clustered by(session) sorted by(requestdate,requesttime) into 4 buckets row format delimited fields terminated by ‘ ’;

将HDFS中的数据导入到HIVE的PageViews贴源数据表中
>> load data inpath ‘/clickstream/pageviews’ overwrite into table pageviews partition(inputDate=‘2016-05-17’);
如果没有标示是在’Local‘本地文件系统中,则会去HDFS中加载数据

根据具体的业务分析逻辑创建ODS层的PageViews事实表,并从PageViews的贴源表中导入数据

这里根据请求的页面URL来分组(clustered)是为了方便统计每个页面的PV
>> create table ods_pageviews(session string,ip string,viewtime string,visitpage string, staytime string,step string) partitioned by(inputDate string) clustered by(visitpage) sorted by(viewtime) into 4 buckets row format delimited fields terminated by ‘ ’;

>> insert into table ods_pageviews partition(inputDate='2016-05-17') select pv.session,pv.ip,concat(pv.requestdate,"-",pv.requesttime),pv.visitpage,pv.staytime,pv.step from pageviews as pv where pv.inputDate='2016-05-17';

创建PageViews事实表的时间维度表并从当天的事实表里导入数据

>>create table ods_dim_pageviews_time(time string,year string,month string,day string,hour string,minutes string,seconds string) partitioned by(inputDate String) clustered by(year,month,day) sorted by(time) into 4 buckets row format delimited fields terminated by ' ';

>> insert overwrite table ods_dim_pageviews_time partition(inputDate='2016-05-17') select distinct pv.viewtime, substring(pv.viewtime,0,4),substring(pv.viewtime,6,2),substring(pv.viewtime,9,2),substring(pv.viewtime,12,2),substring(pv.viewtime,15,2),substring(pv.viewtime,18,2) from ods_pageviews as pv;

创建PageViews事实表的URL维度表并从当天的事实表里导入数据
>> create table ods_dim_pageviews_url(visitpage string,host string,path string,query string) partitioned by(inputDate string) clustered by(visitpage) sorted by(visitpage) into 4 buckets row format delimited fields terminated by ' ';

>> insert into table ods_dim_pageviews_url partition(inputDate='2016-05-17') select distinct pv.visitpage,b.host,b.path,b.query from pageviews pv lateral view parse_url_tuple(concat('https://localhost',pv.visitpage),'HOST','PATH','QUERY') b as host,path,query;

查询每天PV总数前20的页面
>> select op.visitpage as path,count(*) as num from ods_pageviews as op join ods_dim_pageviews_url as opurl on (op.visitpage = opurl.visitpage) join ods_dim_pageviews_time as optime on (optime.time = op.viewtime) where optime.year='2013' and optime.month='09' and optime.day='19' group by op.visitpage sort by num desc limit 20;

运行结果:


Visits数据分析
页面具体访问记录Visits的事实表和维度表结构

使用HIVE在数据仓库中创建Visits信息的贴源数据表:
>> create table visitsinfo(session string,startdate string,starttime string,enddate string,endtime string,entrypage string,leavepage string,viewpagenum string,ip string,referal string) partitioned by(inputDate string) clustered by(session) sorted by(startdate,starttime) into 4 buckets row format delimited fields terminated by ' ';

将HDFS中的数据导入到HIVE的Visits信息贴源数据表中
>> load data inpath '/clickstream/visitsinfo' overwrite into table visitsinfo partition(inputDate='2016-05-18');



根据具体的业务分析逻辑创建ODS层的Visits事实表,并从visitsinfo的贴源表中导入数据
>> create table ods_visits(session string,entrytime string,leavetime string,entrypage string,leavepage string,viewpagenum string,ip string,referal string) partitioned by(inputDate string) clustered by(session) sorted by(entrytime) into 4 buckets row format delimited fields terminated by ' ';

>> insert into table ods_visits partition(inputDate='2016-05-18') select vi.session,concat(vi.startdate,"-",vi.starttime),concat(vi.enddate,"-",vi.endtime),vi.entrypage,vi.leavepage,vi.viewpagenum,vi.ip,vi.referal from visitsinfo as vi where vi.inputDate='2016-05-18';

创建Visits事实表的时间维度表并从当天的事实表里导入数据

>>create table ods_dim_visits_time(time string,year string,month string,day string,hour string,minutes string,seconds string) partitioned by(inputDate String) clustered by(year,month,day) sorted by(time) into 4 buckets row format delimited fields terminated by ' ';

将“访问时间”和“离开时间”两列的值合并后再放入时间维度表中,减少数据的冗余
>>insert overwrite table ods_dim_visits_time partition(inputDate='2016-05-18') select distinct ov.timeparam, substring(ov.timeparam,0,4),substring(ov.timeparam,6,2),substring(ov.timeparam,9,2),substring(ov.timeparam,12,2),substring(ov.timeparam,15,2),substring(ov.timeparam,18,2) from (select ov1.entrytime as timeparam from ods_visits as ov1 union select ov2.leavetime as timeparam from ods_visits as ov2) as ov;



创建visits事实表的URL维度表并从当天的事实表里导入数据
>> create table ods_dim_visits_url(pageurl string,host string,path string,query string) partitioned by(inputDate string) clustered by(pageurl) sorted by(pageurl) into 4 buckets row format delimited fields terminated by ' ';

将每个session的进入页面和离开页面的URL合并后存入到URL维度表中
>>insert into table ods_dim_visits_url partition(inputDate='2016-05-18') select distinct ov.pageurl,b.host,b.path,b.query from (select ov1.entrypage as pageurl from ods_visits as ov1 union select ov2.leavepage as pageurl from ods_visits as ov2 ) as ov lateral view parse_url_tuple(concat('https://localhost',ov.pageurl),'HOST','PATH','QUERY') b as host,path,query;

将每个session从哪个外站进入当前网站的信息存入到URL维度表中
>>insert into table ods_dim_visits_url partition(inputDate='2016-05-18') select distinct ov.referal,b.host,b.path,b.query from ods_visits as ov lateral view parse_url_tuple(ov.referal,'HOST','PATH','QUERY') b as host,path,query;


统计每个页面的跳出人数(事实上真正有价值的统计应该是统计页面的跳出率,但为了简单示范,作者在这里简化成统计跳出人数)
>> select ov.leavepage as jumpPage, count(*) as jumpNum from ods_visits as ov group by ov.leavepage order by jumpNum desc;



业务页面转换率分析(漏斗模型)
Hive在创建表的时候无法实现某个字段自增长的关键字,得使用自定义函数(user-defined function)UDF来实现相应的功能。在查询的时候可以使用row_number()来显示行数,不过必须要在complete mode下才能使用,所以可以使用row_number() 函数配合开窗函数over(),具体示例如下。 为简单起见,这里我们创建一个临时表,并手动在里面插入要查看的业务页面链接以及该页面的PV总数,通过这几个参数来计算业务页面之间的转换率,也就是所谓的漏斗模型。
假设我们有“/index” -> “/detail” -> “/createOrder” ->”/confirmOrder” 这一业务页面转化流程

首先我们要创建业务页面的PV的临时信息表,临时表和里面的数据会在session结束的时候清理掉
>> create temporary table transactionpageviews(url string,views int) row format delimited fields terminated by ' ';

先统计业务页面的总PV然后按转换步骤顺序插入每个页面的PV信息到transactionpageviews表中
>> insert into table transactionpageviews select opurl.path as path,count(*) as num from ods_pageviews as op join ods_dim_pageviews_url as opurl on (op.visitpage = opurl.visitpage) join ods_dim_pageviews_time as optime on (optime.time = op.viewtime) where optime.year='2013' and optime.month='09' and optime.day='19' and opurl.path='/index' group by opurl.path;

>> insert into table transactionpageviews select opurl.path as path,count(*) as num from ods_pageviews as op join ods_dim_pageviews_url as opurl on (op.visitpage = opurl.visitpage) join ods_dim_pageviews_time as optime on (optime.time = op.viewtime) where optime.year='2013' and optime.month='09' and optime.day='19' and opurl.path='/detail' group by opurl.path;

>> insert into table transactionpageviews select opurl.path as path,count(*) as num from ods_pageviews as op join ods_dim_pageviews_url as opurl on (op.visitpage = opurl.visitpage) join ods_dim_pageviews_time as optime on (optime.time = op.viewtime) where optime.year='2013' and optime.month='09' and optime.day='19' and opurl.path='/createOrder' group by opurl.path;

>> insert into table transactionpageviews select opurl.path as path,count(*) as num from ods_pageviews as op join ods_dim_pageviews_url as opurl on (op.visitpage = opurl.visitpage) join ods_dim_pageviews_time as optime on (optime.time = op.viewtime) where optime.year='2013' and optime.month='09' and optime.day='19' and opurl.path='/confirmOrder' group by opurl.path;

计算业务页面之间的转换率
>> select row_number() over() as rownum,a.url as url, a.views as pageViews,b.views as lastPageViews,a.views/b.views as transferRation from (select row_number() over() as rownum,views,url from transactionpageviews) as a left join (select row_number() over() as rownum,views,url from transactionpageviews) as b on (a.rownum = b.rownum-1 );




Shell脚本+Crontab定时器执行任务调度
执行initialEnv.sh脚本初始化系统环境,为了简单测试,作者只启动了单台服务器,下面的脚本是建立在Hadoop的standalone单节点模式,并且Hive也装在Hadoop服务器上
#!/bin/bash

export HADOOP_HOME=/home/ymh/apps/hadoop-2.6.4

#start hdfs
/home/ymh/apps/hadoop-2.6.4/sbin/start-dfs.sh

#start yarn
if [[ 0 == $? ]]
then
/home/ymh/apps/hadoop-2.6.4/sbin/start-yarn.sh
fi

#start flume
#if [[ 0 == $? ]]
#then
#start flume
#$nohup ~/apache-flume-1.6.0-bin/bin/flume-ng agent -n agent -c conf -f ~/apache-flume-1.6.0-bin/conf/flume-conf.properties &
#fi

#start mysql
if [ 0 = $? ]
then
service mysqld start
fi

#start HIVE SERVER
if [ 0 = $? ]
then
$nohup /apps/apache-hive-1.2.1-bin/bin/hiveserver2 &
fi

执行dataAnalyseTask.sh脚本,先启动MapReduce程序去清洗当日的日志信息,随后使用Hive去构建当日的ODS数据。需要注意的是,本脚本是建立在ODS层中事实表和维度表已经创建完毕的基础上去执行,所以脚本中不会有创建事实表和维度表的HIVE语句(创建语句见上一个章节的内容),并且为了节省篇幅,只列出了PageViews数据分析的脚本部分。
#!/bin/bash

CURDATE=$(date +%y-%m-%d)
CURDATEHIVE=$(date +%Y-%m-%d)

/home/ymh/apps/hadoop-2.6.4/bin/hdfs dfs -df /flume/events/$CURDATE

if [[ 1 -ne $? ]]
then
/home/ymh/apps/hadoop-2.6.4/bin/hadoop jar /export/data/mydata/clickstream.jar com.guludada.clickstream.logClean
fi

if [[ 1 -ne $? ]]
then
/home/ymh/apps/hadoop-2.6.4/bin/hadoop jar /export/data/mydata/clickstream.jar com.guludada.clickstream.logSession
fi

if [[ 1 -ne $? ]]
then
/home/ymh/apps/hadoop-2.6.4/bin/hadoop jar /export/data/mydata/clickstream.jar com.guludada.clickstream.PageViews
fi

#Load today's data
if [[ 1 -ne $? ]]
then
/home/ymh/apps/hadoop-2.6.4/bin/hdfs dfs -chmod 777 /clickstream/pageviews/$CURDATE/ 
echo "load data inpath '/clickstream/pageviews/$CURDATE/' into table pageviews partition(inputDate='$CURDATEHIVE');" | /apps/apache-hive-1.2.1-bin/bin/beeline -u jdbc:hive2://localhost:10000
fi

#Create fact table and its dimension tables
if [[ 1 -ne $? ]]
then
echo "insert into table ods_pageviews partition(inputDate='$CURDATEHIVE') select pv.session,pv.ip,concat(pv.requestdate,'-',pv.requesttime) as viewtime,pv.visitpage,pv.staytime,pv.step from pageviews as pv where pv.inputDate='$CURDATEHIVE';" | /apps/apache-hive-1.2.1-bin/bin/beeline -u jdbc:hive2://localhost:10000
fi

if [[ 1 -ne $? ]]
then
echo "insert into table ods_dim_pageviews_time partition(inputDate='$CURDATEHIVE') select distinct pv.viewtime, substring(pv.viewtime,0,4),substring(pv.viewtime,6,2),substring(pv.viewtime,9,2),substring(pv.viewtime,12,2),substring(pv.viewtime,15,2),substring(pv.viewtime,18,2) from ods_pageviews as pv;" | /apps/apache-hive-1.2.1-bin/bin/beeline -u jdbc:hive2://localhost:10000
fi

if [[ 1 -ne $? ]]
then
echo "insert into table ods_dim_pageviews_url partition(inputDate='$CURDATEHIVE') select distinct pv.visitpage,b.host,b.path,b.query from pageviews pv lateral view parse_url_tuple(concat('https://localhost',pv.visitpage),'HOST','PATH','QUERY') b as host,path,query;" | /apps/apache-hive-1.2.1-bin/bin/beeline -u jdbc:hive2://localhost:10000
fi

创建crontab文件,指定每天的凌晨01点整执行dataAnalyseTask.sh脚本,该脚本执行“使用MapReduce清理日志文件”和“使用HiveSql构建分析ODS层数据”两项任务,并将用户自定义的crontab文件加入到定时器中
$vi root_crontab_hadoop
$echo "0 1 * * * /myShells/dataAnalyseTask.sh" >> root_crontab_hadoop
$crontab root_crontab_hadoop

至此,使用Hadoop进行离线计算的简单架构和示例已经全部阐述完毕,而关于如何使用Sqoop将Hive中的数据导入Mysql中,因为篇幅有限,这里就不展开了。

相关TAG标签
上一篇:申通快递某重要站点SQL注入/9000多员工信息/可登陆邮箱
下一篇:flume学习一:flume基础知识
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站