频道栏目
首页 > 网络 > 云计算 > 正文

hadoop基础----hadoop实战(五)-----myeclipse开发MapReduce---WordCount例子---解析MapReduce的写法

2016-09-21 09:22:20           
收藏   我要投稿

目标

MapReduce主要的流程是 map----》reduce。

我们本章节来详细学习java代码中,是怎样配置实现MapReduce的。就以WordCount例子为例。

本章节的目的是 熟悉MapReduce的写法之后,我们能写出更多的业务处理,解决更多的其它问题。

 

MapReduce的结构

写一个MapReduce主要有三部分:

Mapper接口的实现,Reducer接口的实现,Job的配置。

Mapper接口和Reducer接口的实现就是要分别编写两个类(例如分别叫做Map类和Reduce类)。

在Map类中规定如何将输入的对转化为中间结果的对。

在Reduce类中规定如何将Map输出的中间结果进一步处理,转化为最终的结果输出对。

而对Job的配置是要在main函数中创建相关对象,调用其方法实现的。

 

 

完整代码

 

package org.apache.hadoop.examples; 

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {


//编写完成Map任务的静态内部类,类的名字就叫TokenizerMapper,继承Mapper类
  public static class TokenizerMapper 
       extends Mapper{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  
  //编写完成Reduce任务的静态内部类,类的名字就叫IntSumReducer,继承Reducer类
  public static class IntSumReducer 
       extends Reducer {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  //main函数中所要做的就是Job的配置和提交  
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount  ");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

文件中的内容

我们用来作测试的文件有2个,分别是file1.txt和file2.txt。

file1.txt中是

hello world

 

file2.txt中是

hello hadoop

 

Mapper接口的实现分析

 

  public static class TokenizerMapper 
       extends Mapper{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

 

 

这段代码实现了Map的功能,我们声明了一个类TokenizerMapper(类名随意,我们也可以起名WordCountMap但是必须继承Mapper接口)继承了Mapper接口---接口的参数是固定的,也就是写其它功能的MapReduce也继承这个接口,用这几个参数或者适当调整。

熟悉java的同学会看到出现了一些新的数据类型:比如Text,IntWritable,Context。

LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,这些类实现了WritableComparable接口,它们都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为long,int,String 的替代品。

Context则是负责收集键值对的中间结果或者最终结果,有些版本可以用OutputCollectoroutput,但是用法都一样,都是用来收集结果。

 

 

 private final static IntWritable one = new IntWritable(1);
定义了一个int赋值1,作为计数器。

 

private Text word = new Text();
定义一个变量,用来保存key。这个key会用来作为map区分数据。

 

 public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
Mapper接口中的必须有map方法实现功能,传入参数一般也是固定的。

 

参数中context负责收集键值对的中间结果传递给reduce。

我们的file1.txt和file2.txt在hadoop中会经过TextInputFormat,每个文件(或其一部分)都会单独地作为map的输入,而这是继承自FileInputFormat的。之后,每行数据都会生成一条记录,每条记录则表示为形式:key值是每个数据的记录在数据分片中的字节偏移量,数据类型是LongWritable;value值是每行的内容,数据类型是Text。

 

也就是说 我们写在文本中的内容 就存在 Text value这个参数中。

 

 

   StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
StringTokenizer 是一个分词器。用来把句子拆分成一个个单词。value是我们文本中的内容,这里是把内容分成一个个单词。

 

然后通过while去遍历, 把单词放入word变量中。

然后把word变量和计数器1作为结果 存起来。

 

那么经过了map之后的context中的结果就是

这个结果会自动传给reduce。

 

 

 

 

Reducer接口的实现分析

 

  public static class IntSumReducer 
       extends Reducer {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

这段代码实现了Reduce的功能,我们声明了一个类IntSumReducer(类名随意,我们也可以起名WordCountReduce但是必须继承Reducer接口)继承了Reducer接口---接口的参数是固定的,也就是写其它功能的MapReduce也继承这个接口,用这几个参数或者适当调整参数类型。

 

 

 

 private IntWritable result = new IntWritable();

定义一个变量,用来装每一组的计数结果。

 

 

 

 public void reduce(Text key, Iterable values, 
                       Context context
                       ) throws IOException, InterruptedException {
Reducer接口中的必须有reduce方法实现功能,传入参数一般也是固定的。

 

 

参数中context负责收集键值对的最终结果。

 

key对应map传递过滤的key,values对应map传递过滤的value。

为什么这里是values呢。

因为进入reduce方法时会自动分组,只有key一样的数据才会同时进入一个reduce中。

map传递过来的结果中是

 


也就是 这个例子中会进入三次reduce,

第一次 key 是 hello, values是[1,1]

第二次key 是 world,values是[1]

第三次key是 hadoop,values是[1]

 

 

  int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
循环values列表,相加计数后放入最终结果容器context中。

 

 


所以最终的结果是

Job的配置--main方法

 

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount  ");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

mapreduce中需要一个main方法配置参数,向hadoop框架描述map-reduce执行的工作,并提交运行。

 

 

 Configuration conf = new Configuration();
创建一个配置实例。

 

 

 

 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount  ");
      System.exit(2);
    }
获取参数并判断是否合法。

 

 

Job job = new Job(conf, "word count");
新建一个job任务。

 

 

 job.setJarByClass(WordCount.class);
设置运行的jar类,也就是mapreduce的主类名。

 
 

job.setMapperClass(TokenizerMapper.class);
设置map类,也就是继承map接口的类名。


job.setCombinerClass(IntSumReducer.class);
设置Combiner类,其实map到reduce还有一道工序是Combiner,如果有特殊需求可以新建一个类,没有的话直接使用继承reduce接口的类即可。


  job.setReducerClass(IntSumReducer.class);
设置reduce类,也就是继承reduce接口的类名。



 job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
设置输出结果的 key 和value的数据类型。

 

 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
设置输入输入结果的路径,我们可以把路径写死,也可以通过参数传进来,我们这里就是用的接受的参数的值。

 
 

System.exit(job.waitForCompletion(true) ? 0 : 1);
提交运行,完成后退出程序。
上一篇:hadoop基础----hadoop实战(四)-----myeclipse开发MapReduce---myeclipse搭建hadoop开发环境并运行wordcount
下一篇:PGM:贝叶斯网的参数估计
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站