博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mapreduce-排序与分区内排序及实例
阅读量:3966 次
发布时间:2019-05-24

本文共 4478 字,大约阅读时间需要 14 分钟。

在排序的时候,对每一次排序都是默认的按照key进行字典排序的,要实现自定义排序的方式,就是自定对象实现writablecomparable接口实现compareto方法,从而实现自定义排序。

如:

@Overridepublic int compareTo(FlowBean o) {
int result; // 按照总流量大小,倒序排列 if (sumFlow > bean.getSumFlow()) {
result = -1; }else if (sumFlow < bean.getSumFlow()) {
result = 1; }else {
result = 0; } return result;}

对手机号总流量实现全排序的实例:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CvjG1UM3-1603712991906)(https://s1.ax1x.com/2020/10/25/BewHc6.png)]

实例代码:

FlowBean

package com.mr.sort;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class FlowBean implements WritableComparable
{ private long upFlow; private long downFlow; private long sumFlow; public FlowBean() { super();//反射调用 } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; sumFlow = upFlow + downFlow; } @Override public int compareTo(FlowBean bean) { int res; //从大到小倒序排序 if (sumFlow>bean.getSumFlow()) { res = -1; } else if (sumFlow

Mapper

package com.mr.sort;import com.sun.tools.javac.comp.Flow;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class sortMapper extends Mapper
{
FlowBean k = new FlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString(); String[] fields = line.split("\t"); k.setUpFlow(Long.parseLong(fields[1])); k.setDownFlow(Long.parseLong(fields[2])); k.setSumFlow(Long.parseLong(fields[3])); v.set(fields[0]); context.write(k,v); }}

Reducer

package com.mr.sort;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class sortReducer extends Reducer
{
@Override protected void reduce(FlowBean key, Iterable
values, Context context) throws IOException, InterruptedException {
for (Text value:values) {
context.write(value,key); } }}

Driver

package com.mr.sort;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class sortDriver  {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{
"d:/mapreduceinput/input3","d:/mapreduceoutput/output3"}; Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(sortDriver.class); job.setMapperClass(sortMapper.class); job.setReducerClass(sortReducer.class); job.setPartitionerClass(sortPartitioner.class);//设置分区 job.setNumReduceTasks(5);//在分区类中设定了5个分区选择 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res?0:1); //Configuration configuration = new Configuration(); }}

区内排序

对数据进行分区是在map任务完成之后进行的对数据的每一个分区都会分配一个reduceteask任务,而顺便进行排序,那我们可以直接在上面全排序的基础上整个添加一个partitioner类,实现数据分区内排序。

sortPartitioner

package com.mr.sort;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class sortPartitioner extends Partitioner
{
@Override public int getPartition(FlowBean bean, Text text, int i) {
//key为手机号 //value为流量信息 String presum = text.toString().substring(0,3); int partition = 4; if ("136".equals(presum)) {
partition = 0; } else if ("137".equals(presum)) {
partition = 1; } else if ("138".equals(presum)) {
partition = 2; } else if ("139".equals(presum)) {
partition = 3; } return partition; }}

转载地址:http://vicki.baihongyu.com/

你可能感兴趣的文章
Perl 注释
查看>>
数据类型之标量
查看>>
调试 Perl 脚本
查看>>
增强的for循环语句
查看>>
静态导入
查看>>
java 泛型
查看>>
控制结构
查看>>
标准输入输出
查看>>
运算符
查看>>
数据类型之列表与数组
查看>>
比较字符串
查看>>
Java EE 精萃
查看>>
Open Source 精萃
查看>>
Java EE 简介
查看>>
Weblogic 简介
查看>>
观察者模式 (Observer)
查看>>
Java 集合框架
查看>>
Weblogic 精萃
查看>>
Servlet 精萃
查看>>
XStream 精萃
查看>>