package com.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ModelDemo implements Tool{ //第一个参数(key)必须为LongWritable或者IntWritable,因为他代表的是 行偏移量:每一行的第一个字母距离该文件的首位置的距离 //第二个参数 代表 map阶段输入value类型 //第三个参数 代表 map阶段输出key类型 //第四个参数 代表map阶段输出value类型 public static class MyMapper extends Mapper{ /** * value值代表输入的值 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1、从输入数据中获取每一个文件中的每一行的值 String line = value.toString(); // 2、对每一行的数据进行切分(看情况) String[] words = line.split(" "); // 3、循环处理 for (String word : words) { value.set(1 + ""); // map阶段的输出 context.write(new Text(word), value); } } } /** * * 第一个Text代表 Reduce阶段Key输入的值类型 需要和Map阶段输出Key的类型相同 * 第二个Text代表 Reduce阶段Value输入的值类型 需要和Map阶段输出Value的类型相同 * 第三个Text代表 Reduce阶段Key输出的值类型 按逻辑自己定义 * 第二个Text代表 Reduce阶段Value输出的值类型 按逻辑自己定义 * @author Administrator * */ public static class MyReduce extends Reducer { @Override protected void reduce(Text value, Iterable list, Context context) throws IOException, InterruptedException { int count = 0; for (Text i : list) { count += Integer.parseInt(i.toString()); } context.write(value, new Text(count+"")); } } /** * 设置conf类型 */ public void setConf(Configuration conf) { // TODO Auto-generated method stub conf.set("fs.defaultFS", "hdfs://zwj"); conf.set("dfs.nameservices", "zwj"); conf.set("dfs.ha.namenodes.zwj", "nn1,nn2"); conf.set("dfs.namenode.rpc-address.zwj.nn1", "hadoop01:9000"); conf.set("dfs.namenode.rpc-address.zwj.nn2", "hadoop02:9000"); conf.set("dfs.client.failover.proxy.provider.zwj", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); } public Configuration getConf() { // TODO Auto-generated method stub return new Configuration(); } public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = getConf(); Job job = Job.getInstance(conf, "job"); job.setJarByClass(ModelDemo.class); //设置自定义mapper的值 job.setMapperClass(MyMapper.class); //对Map阶段输出 的key value 的类型进行赋值 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置自定义Reduce的值 job.setReducerClass(MyReduce.class); //对Reduce阶段输出 的key value 的类型进行赋值 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置input output 的数值,通过args[进行赋值] setInputAndOutput(job, conf, args); return (job.waitForCompletion(true) 0 : 1); } private void setInputAndOutput(Job job, Configuration conf, String[] args) throws Exception { if (args.length != 2) { System.out.println("数据格式不正确"); return; } FileInputFormat.addInputPath(job, new Path(args[0])); FileSystem fs = FileSystem.get(conf); Path outPath = new Path(args[1]); if (fs.exists(outPath)) { fs.delete(outPath, true); } FileOutputFormat.setOutputPath(job, outPath); } /** * 主调用函数通过这个执行方法,并且传入参数 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int isok = ToolRunner.run( new ModelDemo(), args); // 退出整个job System.exit(isok); } }