//011990-99999SIHCCAJAVRI195005151800-11
//011990-99999SIHCCAJAVRI19500515120022
//011990-99999SIHCCAJAVRI1950051507000
//012650-99999TRNSET-HANSMOEN19490324180078
//012650-99999TRNSET-HANSMOEN194903241200111
/**
* 自定义分区方法,将气象站id相同的记录分到相同的reducer中
*
*/
publicstaticclassKeyPartitionerextendsPartitioner< TextPair,Text>{
publicintgetPartition(TextPair key,Text value,intnumPartitions){
//根据气象站id进行选择分区,而不是组合键的整体
return(key.getFirst().hashCode()&Integer.MAX_VALUE) % numPartitions;
}
}
/**
* 自定义分组,将气象站id相同的key放到同一个reducer中执行。然后再通过TextPair进行内部比较排序
*/
publicstaticclassGroupingComparatorextendsWritableComparator{
publicGroupingComparator() {
super(TextPair.class,true);
}
@SuppressWarnings("rawtypes")
@Override
publicintcompare(WritableComparable w1, WritableComparable w2) {
TextPair tp1=(TextPair)w1;
TextPair tp2=(TextPair)w2;
Text f1= tp1.getFirst();
Text f2= tp2.getFirst();
returnf1.compareTo(f2);
}
}
@Override
publicintrun(String[] args)throwsException{
Configuration conf =newConfiguration();// 读取配置文件
Job job = Job.getInstance();// 新建一个任务
job.setJarByClass(JoinRecordWithStationName.class);// 主类
Path recordInputPath =newPath(args[0]);//天气记录数据源
Path stationInputPath =newPath(args[1]);//气象站数据源
Path outputPath =newPath(args[2]);//输出路径
//如果输出路径存在就删除
FileSystem fs = outputPath.getFileSystem(conf);
if(fs.isDirectory(outputPath)){
fs.delete(outputPath,true);
}
MultipleInputs.addInputPath(job,recordInputPath,TextInputFormat.class,JoinRecordMapper.class);//读取天气记录Mapper
MultipleInputs.addInputPath(job,stationInputPath,TextInputFormat.class,JoinStationMapper.class);//读取气象站Mapper
FileOutputFormat.setOutputPath(job,outputPath);
job.setPartitionerClass(KeyPartitioner.class);//自定义分区
job.setGroupingComparatorClass(GroupingComparator.class);//自定义分组
job.setMapOutputKeyClass(TextPair.class);
job.setReducerClass(JoinReducer.class);// Reducer
job.setOutputKeyClass(Text.class);
returnjob.waitForCompletion(true)?0:1;
}
publicstaticvoidmain(String[] args)throwsException{
String args0[]={
"hdfs://yun-11:9000/join/records.txt",
"hdfs://yun-11:9000/join/station.txt",
"hdfs://yun-11:9000/join/out"};
intexitCode = ToolRunner.run(newJoinRecordWithStationName(),args0);
System.exit(exitCode);
}
}
自定义组合键:TextPair
我们使用 TextPair 类构建组合键,包括气象站 ID 和 “标记”。在这里,“标记” 是一个虚拟的字段,其唯一目的是对记录排序,使气象站记录比天气记录先到达。
一种简单的做法就是:对于气象站记录, “标记” 的值设为 0;对于天气记录,“标记” 的值设为1
------------------------------------------------------------------------------------------------------------------------------
packagecom.tan.join1;
importjava.io.*;
importorg.apache.hadoop.io.*;
publicclassTextPairimplementsWritableComparable {
privateTextfirst;//Text 类型的实例变量 first --气象站id
privateTextsecond;//Text 类型的实例变量 second--标记符号
publicTextPair() {
set(newText(),newText());
}
publicTextPair(String first, String second) {
set(newText(first),newText(second));
}
publicTextPair(Text first, Text second) {
set(first, second);
}
publicvoidset(Text first, Text second) {
this.first= first;
this.second= second;
}
publicText getFirst() {
returnfirst;
}
publicText getSecond() {
returnsecond;
}
//将对象转换为字节流并写入到输出流out中
@Override
publicvoidwrite(DataOutput out)throwsIOException {
first.write(out);
second.write(out);
}
//从输入流in中读取字节流反序列化为对象
@Override
publicvoidreadFields(DataInput in)throwsIOException {
first.readFields(in);
second.readFields(in);
}
@Override
publicinthashCode() {
returnfirst.hashCode() *163+second.hashCode();
}
@Override
publicbooleanequals(Object o) {
if(oinstanceofTextPair) {
TextPair tp = (TextPair) o;
returnfirst.equals(tp.first) &&second.equals(tp.second);
}
returnfalse;
}
@Override
publicString toString() {
returnfirst+"\t"+second;
}
//排序
@Override
publicintcompareTo(TextPair tp) {
//根据第一个字段进行对比,如果相等再根据第二个字段进行排序
if(!first.equals(tp.first)) {
returnfirst.compareTo(tp.first);
}elseif(!second.equals(tp.second)){
returnsecond.compareTo(tp.second);
}
return0;
}
}
Map端连接
---------------------------------------------------------------------------------------