package cn.smart.bigdata.mr.mapsidejoin; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MapSideJoin { static class MapsideJoinMapper extends Mapper{ Map pdInfoMap = new HashMap<>(); Text k = new Text(); @Override protected void setup(Mapper .Context context) throws IOException, InterruptedException { BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("/D:/data/mapjoincache/pdts.txt"))); String line; while(StringUtils.isNotBlank(line=br.readLine())){ String[] fields = line.split(","); pdInfoMap.put(fields[0], fields[1]); } br.close(); } @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { String orderLine = value.toString(); String[] fields = orderLine.split("\t"); String pdName = pdInfoMap.get(fields[1]); k.set(orderLine + "\t" + pdName); context.write(k, NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapSideJoin.class); job.setMapperClass(MapsideJoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("D:/data/mapjoininput")); FileOutputFormat.setOutputPath(job, new Path("D:/data/mapjoinoutput")); job.addCacheFile(new URI("file:/D:/data/mapjoincache/pdts.txt")); job.setNumReduceTasks(0); boolean res = job.waitForCompletion(true); System.exit(res?1:0); } }
pdts.txt
pd001,apple
pd002,banana
pd003,orange
order.txt
1001 pd001 300
1001 pd002 20
1002 pd003 40
1003 pd002 50