MapReduce编程案例:分组topn的简单实现
需求:有如下一组数据:
order001,u001,小米6,1999.9,2 order001,u001,雀巢咖啡,99.0,2 order001,u001,安慕希,250.0,2 order001,u001,经典红双喜,200.0,4 order001,u001,防水电脑包,400.0,2 order002,u002,小米手环,199.0,3 order002,u002,榴莲,15.0,10 order002,u002,苹果,4.5,20 order002,u002,肥皂,10.0,40 order003,u001,小米6,1999.9,2 order003,u001,雀巢咖啡,99.0,2 order003,u001,安慕希,250.0,2 order003,u001,经典红双喜,200.0,4 order003,u001,防水电脑包,400.0,2
需要得到如下数据:
order001,u001,小米6,1999.9,2,3999.8 order001,u001,防水电脑包,400.0,2,800.0 order001,u001,经典红双喜,200.0,4,800.0 order003,u001,小米6,1999.9,2,3999.8 order003,u001,经典红双喜,200.0,4,800.0 order003,u001,防水电脑包,400.0,2,800.0 order002,u002,小米手环,199.0,3,597.0 order002,u002,肥皂,10.0,40,400.0 order002,u002,榴莲,15.0,10,150.0
把同一个orderID的数据分组,并且列出前三项花费最多的数据行
实现代码如下:
写一个OrderBean类,并且覆写比较的方法,先比较总价格,再比较ID:
package cn.edu360.mr.order.topn; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; import org.apache.hadoop.io.WritableComparable; public class OrderBean implements WritableComparable{ private String orderId; private String userId; private String pdtName; private float price; private int number; private float amountFee; public void set(String orderId, String userId, String pdtName, float price, int number) { this.orderId = orderId; this.userId = userId; this.pdtName = pdtName; this.price = price; this.number = number; this.amountFee = price * number; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getPdtName() { return pdtName; } public void setPdtName(String pdtName) { this.pdtName = pdtName; } public float getPrice() { return price; } public void setPrice(float price) { this.price = price; } public int getNumber() { return number; } public void setNumber(int number) { this.number = number; } public float getAmountFee() { return amountFee; } public void setAmountFee(float amountFee) { this.amountFee = amountFee; } @Override public String toString() { return this.orderId + "," +this.userId + "," +this.pdtName +"," + this.price + "," +this.number + "," +this.amountFee; } public void readFields(DataInput in) throws IOException { this.orderId = in.readUTF(); this.userId = in.readUTF(); this.pdtName = in.readUTF(); this.price = in.readFloat(); this.number = in.readInt(); this.amountFee = this.price * this.number ; } public void write(DataOutput out) throws IOException { out.writeUTF(this.orderId); out.writeUTF(this.userId); out.writeUTF(this.pdtName); out.writeFloat(this.price); out.writeInt(this.number); } public int compareTo(OrderBean o) { return Float.compare(o.getAmountFee(),this.getAmountFee()) == 0 this.pdtName.compareTo(o.getPdtName()) : Float.compare(o.getAmountFee(), this.getAmountFee()); } }
接着再写MapReduce类,用Collections.sort()的方法进行排序:
package cn.edu360.mr.order.topn; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OrderTopn { public static class OrderTopnMapper extends Mapper{ OrderBean orderBean = new OrderBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); orderBean.set(fields[0], fields[1], fields[2], Float.parseFloat(fields[3]), Integer.parseInt(fields[4])); k.set(fields[0]); //从这里交给maptask的kv对象,会被maptask序列化后存储,所以不用担心覆盖的问题 context.write(k, orderBean); } } public static class OrderTopnReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { int topn = context.getConfiguration().getInt("order.top.n", 3); ArrayList beanList = new ArrayList (); for (OrderBean orderBean : values) { // 构造一个新的对象,来存储本次迭代出来的值 OrderBean newBean = new OrderBean(); newBean.set(orderBean.getOrderId(), orderBean.getUserId(), orderBean.getPdtName(), orderBean.getPrice(), orderBean.getNumber()); beanList.add(newBean); } Collections.sort(beanList); for(int i =0;i < topn; i++) { context.write(beanList.get(i), NullWritable.get()); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // 默认只加载core-default.xml core-site.xml conf.setInt("order.top.n", 2); Job job = Job.getInstance(conf); job.setJarByClass(OrderTopn.class); job.setMapperClass(OrderTopnMapper.class); job.setReducerClass(OrderTopnReducer.class); job.setNumReduceTasks(2); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(OrderBean.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\order\\input")); FileOutputFormat.setOutputPath(job, new Path("F:\\mrdata\\order\\out1")); job.waitForCompletion(true); } }