频道栏目
首页 > 资讯 > 云计算 > 正文

HadoopJoin

16-08-16        来源:[db:作者]  
收藏   我要投稿

Hadoop 中的join分为三种

Reduce端join,适合于两个大表 Map端join,适合一个大表和一个小表,小表放到 Distribute Cache里面 semi join 当join只用到其中一个表中的一小部分时

Reduce端join

读入两个大表,对value按文件进行标记在Reduce端收集属于不同文件的value到不同的list,对同一key的不同list中的value做笛卡尔积 Logger 用来记录错误 Counter 用来记数想要的一些数据 configuration context用来传递数据
public class ReduceJoin {
    private static final String DELIMITER = "\\s+";
    private static final Logger LOG = Logger.getLogger(ReduceJoin.class);
    public static class JoinMapper extends Mapper {
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            String path = split.getPath().toString();
            Configuration conf = context.getConfiguration();
            String t1 = conf.get("t1FileName");
            String t2 = conf.get("t2FileName");
            String line = value.toString();
            if (line == null || line.trim().equals("")) {
                return;
            }
            String[] values = line.split(DELIMITER);
            if (path.contains(t1)) {
                if (values.length != 2) {
                    LOG.error("t1 Number of Fields Error");
                    return;
                }
                context.getCounter("MapStage", "t1 read records").increment(1);
                context.write(new Text(values[0]), new Text("u#" + values[1]));
            } else if (path.contains(t2)) {
                if (values.length != 4) {
                    LOG.error("t2 Number of Fields Error");
                    return;
                }
                context.getCounter("MapStage", "t2 read records").increment(1);
                context.write(new Text(values[0]), new Text("l#" + values[2] + "\t" + values[3]));
            } else {
                context.getCounter("MapStage", "map filtered records").increment(1);
            }
        }
    }
    public static class JoinReducer extends Reducer {
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException,
                InterruptedException {
            List t1 = new ArrayList();
            List t2 = new ArrayList();
            for (Text value : values) {
                String[] fields = value.toString().split("#");
                if (fields.length != 2) {
                    continue;
                }
                if (fields[0].equals("u")) {
                    t1.add(fields[1]);
                } else if (fields[0].equals("l")) {
                    t2.add(fields[1]);
                } else {
                    continue;
                }
            }
            for (String it1 : t1) {
                for (String it2 : t2) {
                    context.write(key, new Text(it1 + "\t" + it2));
                }
            }
        }
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 4) {
            return;
        }
        Configuration conf = new Configuration();
        conf.set("t1FileName", args[2]);
        conf.set("t2FileName", args[3]);
        Job job = new Job(conf, "join");
        job.setJarByClass(ReduceJoin.class);
        job.setMapperClass(JoinMapper.class);
        job.setReducerClass(JoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}  

Map端join

适用于一大一小两个表小表装进Distribute Cache里
public class MapJoin {
    private static final Logger LOG = Logger.getLogger(MapJoin.class);

    protected static class MapJoinMapper extends Mapper{
        private Map map = new HashMap();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader br = new BufferedReader(new FileReader("t1"));
            String line = null;
            while((line = br.readLine())!=null){
                if(line == null || line.equals("")){
                    return;
                }
                String[] fields = line.split("\\s+");
                if(fields.length!=2){
                    context.getCounter("MapStage","Input Record Fields Count Error").increment(1);
                    return;
                }
                map.put(fields[0], fields[1]);
            }
            br.close();
        }
        @Override
        protected void map(Object key, Text value, Context context)throws IOException, InterruptedException {
            String line = value.toString();
            if(line == null || line.equals("")){
                return;
            }
            String[] fields = line.split("\\s+");
            if(fields.length!=4){
                context.getCounter("ReduceStage","Map output Record Fields Count Error").increment(1);
            }
            if(map.containsKey(fields[0])){
                context.write(new Text(fields[0]), new Text(map.get(fields[0])+"\t"+fields[2]+"\t"+fields[3]));
            }
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        DistributedCache.addCacheFile(new URI("hdfs://namenode/user/zhanghu/cache/t1#t1"), conf);
        Job job = new Job(conf,"MapJoin");
        job.setJarByClass(MapJoin.class);
        job.setMapperClass(MapJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}  

Semi Join

在map端进行数据过滤,只传输参与join的数据,减少shuffle阶段网络传输量前提是存在于Logs中的UserId字段可以被放入到Cache中实现方法
首先对右表中的UserId字段进行去重,保存在UniqueUsers 利用DistributeCache去除User表中UserId不在右表中的数据
/**
* 去重
**/
public class RemoveDuplicates {
    public static class RemoveDuplicatesMapper extends Mapper {
        Set set = new HashSet();
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split("\\s+");
            if (fields.length != 4) {
                return;
            }
            set.add(new Text(fields[0]));
        }
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            for (Text value : set) {
                context.write(value, NullWritable.get());
            }
        }
    }
    public static class RemoveDuplicatesReducer extends Reducer {
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException,
                InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "RemoveDuplicates");
        job.setJarByClass(RemoveDuplicates.class);
        job.setMapperClass(RemoveDuplicatesMapper.class);
        job.setReducerClass(RemoveDuplicatesReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}
/**
* 连接,除了去除不在右表中的User外与ReduceJoin一样
**/
public class SemiJoin {
    public static class SemiJoinMapper extends Mapper{
        Set set = new HashSet();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader br = new BufferedReader(new FileReader("UniqueUsers"));
            String line = null;
            while((line = br.readLine()) != null){
                if(!line.trim().equals("")){
                    set.add(line.trim());
                }
            }
            br.close();
        }
        @Override
        protected void map(Object key, Text value, Context context)throws IOException, InterruptedException {
            FileSplit split = (FileSplit)context.getInputSplit();
            String path = split.getPath().toString();
            String line = value.toString();
            String[] fields = line.split("\\s+");
            if(path.contains("t1")){
                if(fields.length!=2){
                    return;
                }
                if(set.contains(fields[0])){
                    context.write(new Text(fields[0]), new Text("u#"+fields[1]));
                }
            }else if(path.contains("t2")){
                if(fields.length!=4){
                    return;
                }
                context.write(new Text(fields[0]), new Text("l#"+fields[2]+"\t"+fields[3]));
            }else{
                context.getCounter("MapStage","Invalid Records").increment(1);
            }
        }
    }

    public static class SemiJoinReducer extends Reducer{
        private List listT1 = new ArrayList();
        private List listT2    = new ArrayList();
        @Override
        protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException {
            for(Text value:values){
                String[] fields = value.toString().split("#");
                if("u".equals(fields[0])){
                    listT1.add(fields[1]);
                }
                if("l".equals(fields[0])){
                    listT2.add(fields[1]);
                }
            }
            for(String t1:listT1){
                for(String t2:listT2){
                    context.write(key, new Text(t1+"\t"+t2));
                }
            }
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        DistributedCache.addCacheFile(new URI("/user/zhanghu/cache/UniqueUsers#UniqueUsers"),conf);
        Job job = new Job(conf,"SemiJoin");
        job.setJarByClass(SemiJoin.class);
        job.setMapperClass(SemiJoinMapper.class);
        job.setReducerClass(SemiJoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

改进方案

第二步中还是用到了ReduceJoin所以还是需要传输较多数据前提经过过滤后的用户表可以被全部放入到cache中实现方案
对右表中的UserID字段进行去重,保存在UniquUsers中以UniqueUsers作为cache对Users表进行过滤,得到FilteredUsers 以FiltereddUsers作为cache,与UserLog进行Map端连接改进方案的特点
优点:三个步骤全部只有Map,没有Shuffle阶段,完全并行缺点:需要启动三个作业,且要多次读入Cache,如果Cache比较大得不偿失
相关TAG标签
上一篇:Hadoop之仿写搜索引擎
下一篇:MapReduce中的join算法-reduce端join
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站