频道栏目
首页 > 资讯 > 云计算 > 正文

MapReduce中的join算法-reduce端join

16-08-16        来源:[db:作者]  
收藏   我要投稿
在海量数据的环境下,不可避免的会碰到join需求, 例如在数据分析时需要连接从不同的数据源中获取到数据。
假设有两个数据集:气象站数据库和天气记录数据库,并考虑如何合二为一。
一个典型的查询是:输出气象站的历史信息,同时各行记录也包含气象站的元数据信息。
 
气象站和天气记录的示例数据分别如下所示:
StationID StationName
011990-99999SIHCCAJAVRI
012650-99999TRNSET-HANSMOEN
 
StationID TimestampTemperature
012650-99999194903241200111
012650-9999919490324180078
011990-999991950051507000
011990-9999919500515120022
011990-99999195005151800-11
 
 
气象站和天气记录合并之后的示意图如下所示。
StationIDStationNameTimestampTemperature
011990-99999SIHCCAJAVRI1950051507000
011990-99999SIHCCAJAVRI19500515120022
011990-99999SIHCCAJAVRI195005151800-11
012650-99999TYNSET-HANSMOEN194903241200111
012650-99999TYNSET-HANSMOEN19490324180078
 
Reducer端连接:
基本思路是 mapper 为各个记录标记源,并且使用连接键作为 map 输出键,使键相同的记录放在同一 reducer 中。

我们通过下面两种技术实现 reduce 端连接。

1、多输入

数据集的输入源往往有多种格式,因此可以使用 MultipleInputs 类来方便地解析和标注各个数据源。

2、二次排序

 
reducer 将两个数据源中选出键相同的记录并不介意这些记录是否已排好序。此外,为了更好地执行连接操作,先将某一个数据源传输到 reducer 会非常重要。
以上面的天气数据连接为例,当天气记录发送到 reducer 的时候,与这些记录有相同键的气象站信息最好也已经放在 reducer ,使得 reducer 能够将气象站名称填到天气记录之中就马上输出。
虽然也可以不指定数据传输次序,并将待处理的记录缓存在内存之中,但应该尽量避免这种情况,因为其中任何一组的记录数量可能非常庞大,远远超出 reducer 的可用内存容量。
因此我们用到二次排序技术,对 map 阶段输出的每个键的值进行排序,实现这一效果。

---------------------------------------------------------------------------------------
 
 
程序示例:
[主类]
publicclassJoinRecordWithStationNameextendsConfiguredimplementsTool{
/**
* 气象站mapper 标记为“0”,先到达reducer
*/
publicstaticclassJoinStationMapperextendsMapper< LongWritable,Text,TextPair,Text>{
protectedvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{
String line = value.toString();
String[] arr = line.split("\\s+");//解析气象站数据 012650-99999194903241200111
intlength = arr.length;
if(length==2){//满足这种数据格式
//key=气象站id value=气象站名称
context.write(newTextPair(arr[0],"0"),newText(arr[1]));
}
}
}
/**
* 天气记录mapper标记为“1”,后到达reducer
*/
publicstaticclassJoinRecordMapperextendsMapper< LongWritable,Text,TextPair,Text>{
protectedvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{
String line = value.toString();
String[] arr = line.split("\\s+",2);//解析天气记录数据
intlength = arr.length;
if(length==2){
//key=气象站id value=天气记录数据
context.write(newTextPair(arr[0],"1"),newText(arr[1]));
}
}
}
 
/**
*通过上面的分组,将相同的气象站id分到同一个reducer中进行输出
*
*由于TextPair经过了二次排序,所以reducer会先接收到气象站数据。
*因此从中抽取气象站名称,并将其作为后续每条输出记录的一部分写到输出文件
 
*/
publicstaticclassJoinReducerextendsReducer{
protectedvoidreduce(TextPair key, Iterable< Text> values,Context context)throwsIOException,InterruptedException{
Iterator< Text> iter = values.iterator();
Text stationName =newText(iter.next());//气象站名称 SIHCCAJAVRI
 
while(iter.hasNext()){
Text record = iter.next();//天气记录的每条数据 19500515120022
Text outValue =newText(stationName.toString()+"\t"+record.toString());
context.write(key.getFirst(),outValue);
}
}
}
 
//输出结果:
 
//011990-99999SIHCCAJAVRI195005151800-11
//011990-99999SIHCCAJAVRI19500515120022
//011990-99999SIHCCAJAVRI1950051507000
//012650-99999TRNSET-HANSMOEN19490324180078
//012650-99999TRNSET-HANSMOEN194903241200111
 
/**
* 自定义分区方法,将气象站id相同的记录分到相同的reducer中
*
*/
publicstaticclassKeyPartitionerextendsPartitioner< TextPair,Text>{
publicintgetPartition(TextPair key,Text value,intnumPartitions){
//根据气象站id进行选择分区,而不是组合键的整体
return(key.getFirst().hashCode()&Integer.MAX_VALUE) % numPartitions;
}
}
 
 
 
/**
* 自定义分组,将气象站id相同的key放到同一个reducer中执行。然后再通过TextPair进行内部比较排序
*/
publicstaticclassGroupingComparatorextendsWritableComparator{
publicGroupingComparator() {
super(TextPair.class,true);
}
 
@SuppressWarnings("rawtypes")
@Override
publicintcompare(WritableComparable w1, WritableComparable w2) {
TextPair tp1=(TextPair)w1;
TextPair tp2=(TextPair)w2;
Text f1= tp1.getFirst();
Text f2= tp2.getFirst();
returnf1.compareTo(f2);
}
}
 
 
@Override
publicintrun(String[] args)throwsException{
Configuration conf =newConfiguration();// 读取配置文件
 
Job job = Job.getInstance();// 新建一个任务
job.setJarByClass(JoinRecordWithStationName.class);// 主类
 
Path recordInputPath =newPath(args[0]);//天气记录数据源
Path stationInputPath =newPath(args[1]);//气象站数据源
 
Path outputPath =newPath(args[2]);//输出路径
//如果输出路径存在就删除
FileSystem fs = outputPath.getFileSystem(conf);
if(fs.isDirectory(outputPath)){
fs.delete(outputPath,true);
}
 
MultipleInputs.addInputPath(job,recordInputPath,TextInputFormat.class,JoinRecordMapper.class);//读取天气记录Mapper
MultipleInputs.addInputPath(job,stationInputPath,TextInputFormat.class,JoinStationMapper.class);//读取气象站Mapper
 
FileOutputFormat.setOutputPath(job,outputPath);
 
job.setPartitionerClass(KeyPartitioner.class);//自定义分区
job.setGroupingComparatorClass(GroupingComparator.class);//自定义分组
 
job.setMapOutputKeyClass(TextPair.class);
 
job.setReducerClass(JoinReducer.class);// Reducer
 
job.setOutputKeyClass(Text.class);
 
returnjob.waitForCompletion(true)?0:1;
 
}
 
publicstaticvoidmain(String[] args)throwsException{
String args0[]={
"hdfs://yun-11:9000/join/records.txt",
"hdfs://yun-11:9000/join/station.txt",
"hdfs://yun-11:9000/join/out"};
intexitCode = ToolRunner.run(newJoinRecordWithStationName(),args0);
System.exit(exitCode);
}
 
}
 
自定义组合键:TextPair
我们使用 TextPair 类构建组合键,包括气象站 ID 和 “标记”。在这里,“标记” 是一个虚拟的字段,其唯一目的是对记录排序,使气象站记录比天气记录先到达。
一种简单的做法就是:对于气象站记录, “标记” 的值设为 0;对于天气记录,“标记” 的值设为1
------------------------------------------------------------------------------------------------------------------------------
packagecom.tan.join1;
importjava.io.*;
importorg.apache.hadoop.io.*;
publicclassTextPairimplementsWritableComparable {
 
privateTextfirst;//Text 类型的实例变量 first --气象站id
privateTextsecond;//Text 类型的实例变量 second--标记符号
 
publicTextPair() {
set(newText(),newText());
}
 
publicTextPair(String first, String second) {
set(newText(first),newText(second));
}
 
publicTextPair(Text first, Text second) {
set(first, second);
}
 
publicvoidset(Text first, Text second) {
this.first= first;
this.second= second;
}
 
publicText getFirst() {
returnfirst;
}
 
publicText getSecond() {
returnsecond;
}
 
//将对象转换为字节流并写入到输出流out中
@Override
publicvoidwrite(DataOutput out)throwsIOException {
first.write(out);
second.write(out);
}
 
//从输入流in中读取字节流反序列化为对象
@Override
publicvoidreadFields(DataInput in)throwsIOException {
first.readFields(in);
second.readFields(in);
}
 
@Override
publicinthashCode() {
returnfirst.hashCode() *163+second.hashCode();
}
 
 
@Override
publicbooleanequals(Object o) {
if(oinstanceofTextPair) {
TextPair tp = (TextPair) o;
returnfirst.equals(tp.first) &&second.equals(tp.second);
}
returnfalse;
}
 
@Override
publicString toString() {
returnfirst+"\t"+second;
}
 
//排序
@Override
publicintcompareTo(TextPair tp) {
//根据第一个字段进行对比,如果相等再根据第二个字段进行排序
if(!first.equals(tp.first)) {
returnfirst.compareTo(tp.first);
}elseif(!second.equals(tp.second)){
returnsecond.compareTo(tp.second);
}
return0;
}
}
 
 
 
 
Map端连接
---------------------------------------------------------------------------------------

 

在两个大规模输入数据集之间的 map 端连接会在数据达到 map 函数之前就执行连接操作。为达到该目的,各 map 的输入数据必须先分区并且以特定方式排序。

各个输入数据集被划分成相同数量的分区,并且均按相同的键(连接键)排序。同一键的所有记录均会放在同一分区之中。

map 端连接操作可以连接多个作业的输出,只要这些作业的 reducer 数量相同、键相同并且输出文件是不可切分的(例如,小于一个 HDFS 块,或 gzip 压缩)。

 

在上面讲的天气例子中,如果气象站文件以气象站ID部分排序,天气记录文件也以气象站 ID 部分排序,而且 reducer 的数量相同,则就满足了执行 map 端连接的前提条件。

利用 org.apache.hadoop.mapreduce.join 包中的 CompositeInputFormat 类来运行一个 map 端连接。CompositeInputFormat类的输入源和连接类型(内连接或外连接)可以通过一个连接表达式进行配置, 连接表达式的语法简单。

相关TAG标签
上一篇:HadoopJoin
下一篇:Java中Comparable和Comparator的辨析
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站