频道栏目
首页 > 资讯 > Java > 正文

java map端 实现join代码实例

18-04-26        来源:[db:作者]  
收藏   我要投稿
package cn.smart.bigdata.mr.mapsidejoin;


import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;


import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MapSideJoin {


	
	
	static class MapsideJoinMapper extends Mapper{
		
		Map pdInfoMap = new HashMap<>();
		
		Text k = new Text();
		
		@Override
		protected void setup(Mapper.Context context)
				throws IOException, InterruptedException {


			BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("/D:/data/mapjoincache/pdts.txt")));			
			String line;
			while(StringUtils.isNotBlank(line=br.readLine())){
				String[] fields = line.split(",");
				
				pdInfoMap.put(fields[0], fields[1]);
			}
			br.close();
		}
		
		@Override
		protected void map(LongWritable key, Text value, Mapper.Context context)
				throws IOException, InterruptedException {


			String orderLine = value.toString();
			String[] fields = orderLine.split("\t");
			String pdName = pdInfoMap.get(fields[1]);
			k.set(orderLine + "\t" + pdName);
			context.write(k, NullWritable.get());
		}
		
	}
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		job.setJarByClass(MapSideJoin.class);
		job.setMapperClass(MapsideJoinMapper.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		FileInputFormat.setInputPaths(job, new Path("D:/data/mapjoininput"));
		FileOutputFormat.setOutputPath(job, new Path("D:/data/mapjoinoutput"));
		
		job.addCacheFile(new URI("file:/D:/data/mapjoincache/pdts.txt"));
		job.setNumReduceTasks(0);
		
		boolean res = job.waitForCompletion(true);
		System.exit(res?1:0);
		
		
	}
}

 

pdts.txt

pd001,apple
pd002,banana

pd003,orange

 

order.txt

1001 pd001 300
1001 pd002 20
1002 pd003 40
1003 pd002 50

相关TAG标签
上一篇:java (bitmap bitvector)的解析与运用
下一篇:使用Dockerfile Build镜像时常出现的问题
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站