本文共 4478 字,大约阅读时间需要 14 分钟。
在排序的时候,对每一次排序都是默认的按照key进行字典排序的,要实现自定义排序的方式,就是自定对象实现writablecomparable接口实现compareto方法,从而实现自定义排序。
如:
@Overridepublic int compareTo(FlowBean o) { int result; // 按照总流量大小,倒序排列 if (sumFlow > bean.getSumFlow()) { result = -1; }else if (sumFlow < bean.getSumFlow()) { result = 1; }else { result = 0; } return result;}
对手机号总流量实现全排序的实例:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CvjG1UM3-1603712991906)(https://s1.ax1x.com/2020/10/25/BewHc6.png)]
FlowBean
package com.mr.sort;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class FlowBean implements WritableComparable{ private long upFlow; private long downFlow; private long sumFlow; public FlowBean() { super();//反射调用 } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; sumFlow = upFlow + downFlow; } @Override public int compareTo(FlowBean bean) { int res; //从大到小倒序排序 if (sumFlow>bean.getSumFlow()) { res = -1; } else if (sumFlow
Mapper
package com.mr.sort;import com.sun.tools.javac.comp.Flow;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class sortMapper extends Mapper{ FlowBean k = new FlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); k.setUpFlow(Long.parseLong(fields[1])); k.setDownFlow(Long.parseLong(fields[2])); k.setSumFlow(Long.parseLong(fields[3])); v.set(fields[0]); context.write(k,v); }}
Reducer
package com.mr.sort;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class sortReducer extends Reducer{ @Override protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException { for (Text value:values) { context.write(value,key); } }}
Driver
package com.mr.sort;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class sortDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{ "d:/mapreduceinput/input3","d:/mapreduceoutput/output3"}; Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(sortDriver.class); job.setMapperClass(sortMapper.class); job.setReducerClass(sortReducer.class); job.setPartitionerClass(sortPartitioner.class);//设置分区 job.setNumReduceTasks(5);//在分区类中设定了5个分区选择 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res?0:1); //Configuration configuration = new Configuration(); }}
对数据进行分区是在map任务完成之后进行的对数据的每一个分区都会分配一个reduceteask任务,而顺便进行排序,那我们可以直接在上面全排序的基础上整个添加一个partitioner类,实现数据分区内排序。
sortPartitioner
package com.mr.sort;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class sortPartitioner extends Partitioner{ @Override public int getPartition(FlowBean bean, Text text, int i) { //key为手机号 //value为流量信息 String presum = text.toString().substring(0,3); int partition = 4; if ("136".equals(presum)) { partition = 0; } else if ("137".equals(presum)) { partition = 1; } else if ("138".equals(presum)) { partition = 2; } else if ("139".equals(presum)) { partition = 3; } return partition; }}
转载地址:http://vicki.baihongyu.com/