24

MapReduce之GroupingComparator分组(辅助排序、二次排序)

 3 years ago
source link: http://www.cnblogs.com/sunbr/p/13414290.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

指对Reduce阶段的数据根据某一个或几个字段进行分组。

案例

需求

有如下订单数据

7FZnYfb.png!web

现在需要找出每一个订单中最贵的商品,如图

eyyQfin.png!web

需求分析

  • 利用“订单id和成交金额”作为 key ,可以将Map阶段读取到的所有订单数据先按照订单id(升降序都可以),再按照acount(降序)排序,发送到Reduce。

  • 在Reduce端利用 groupingComparator 将订单id相同的 kv 聚合成组,然后取第一个成交金额即是最大值(若有多个成交金额并排第一,则都输出)。

  • Mapper 阶段主要做三件事:

    keyin-valuein
    map()

    keyout-valueout

  • 期待shuffle之后的数据:

    10000001 Pdt_02 222.8

    10000001 Pdt_01 222.8

    10000001 Pdt_05 25.8

    10000002 Pdt_06 722.4

    10000002 Pdt_03 522.8

    10000002 Pdt_04 122.4

    10000003 Pdt_01 232.8

    10000003 Pdt_01 33.8

  • Reducer 阶段主要做三件事:

    keyin-valuein
    reduce()

    keyout-valueout

  • 进入Reduce需要考虑的事

  1. 获取分组比较器,如果没设置默认使用MapTask排序时key的比较器
  2. 默认的比较器比较策略 不符合要求 ,它会将orderId一样且acount一样的记录才认为是一组的
  3. 自定义分组比较器,只按照orderId进行对比,只要OrderId一样,认为key相等,这样可以将orderId相同的分到一个组!
    在组内去第一个最大的即可

编写程序

利用“订单id和成交金额”作为 key ,所以把每一行记录封装为 bean 。由于需要比较ID,所以实现了 WritableComparable 接口

OrderBean.java

public class OrderBean implements WritableComparable<OrderBean>{
	
	private String orderId;
	private String pId;
	private Double acount;
	
	public String getOrderId() {
		return orderId;
	}
	public void setOrderId(String orderId) {
		this.orderId = orderId;
	}
	public String getpId() {
		return pId;
	}
	public void setpId(String pId) {
		this.pId = pId;
	}
	public Double getAcount() {
		return acount;
	}
	public void setAcount(Double acount) {
		this.acount = acount;
	}
	public OrderBean() {
		
	}
	
	@Override
	public String toString() {
		return orderId + "\t" + pId + "\t" + acount ;
	}
	
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(orderId);
		out.writeUTF(pId);
		out.writeDouble(acount);
	}
	
	@Override
	public void readFields(DataInput in) throws IOException {
		orderId=in.readUTF();
		pId=in.readUTF();
		acount=in.readDouble();
	}
	
	// 二次排序,先按照orderid排序(升降序都可以),再按照acount(降序)排序
	@Override
	public int compareTo(OrderBean o) {
		
		//先按照orderid排序升序排序
		int result=this.orderId.compareTo(o.getOrderId());
		
		if (result==0) {//订单ID相同,就比较成交金额的大小
			//再按照acount(降序)排序
			result=-this.acount.compareTo(o.getAcount());
			
		}

		return result;
	}
}

自定义比较器,可以通过两种方法:

WritableCompartor
RawComparator

MyGroupingComparator.java

//实现RawComparator
public class MyGroupingComparator implements RawComparator<OrderBean>{
	
	private OrderBean key1=new OrderBean();
	private OrderBean key2=new OrderBean();
	private  DataInputBuffer buffer=new DataInputBuffer();

	@Override
	public int compare(OrderBean o1, OrderBean o2) {
		return o1.getOrderId().compareTo(o2.getOrderId());
	}

	@Override
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
		
		try {
		      buffer.reset(b1, s1, l1);                   // parse key1
		      key1.readFields(buffer);
		      
		      buffer.reset(b2, s2, l2);                   // parse key2
		      key2.readFields(buffer);
		      
		      buffer.reset(null, 0, 0);                   // clean up reference
		    } catch (IOException e) {
		      throw new RuntimeException(e);
		    }
		
		return compare(key1, key2);
	}

}

MyGroupingComparator2.java

//继承WritableCompartor
public class MyGroupingComparator2 extends WritableComparator{
	
	public MyGroupingComparator2() {
		super(OrderBean.class,null,true);
	}
	
	public int compare(WritableComparable a, WritableComparable b) {
		OrderBean o1=(OrderBean) a;
		OrderBean o2=(OrderBean) b;
	    return o1.getOrderId().compareTo(o2.getOrderId());
	  }
}

OrderMapper.java

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
	
	private OrderBean out_key=new OrderBean();
	private NullWritable out_value=NullWritable.get();
	
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context)
			throws IOException, InterruptedException {
		
		String[] words = value.toString().split("\t");
		
		out_key.setOrderId(words[0]);
		out_key.setpId(words[1]);
		out_key.setAcount(Double.parseDouble(words[2]));
		
		context.write(out_key, out_value);
		
	}
}

OrderReducer.java

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{

	/*
	 * OrderBean key-NullWritable nullWritable在reducer工作期间,
	 * 	只会实例化一个key-value的对象!
	 * 		每次调用迭代器迭代下个记录时,使用反序列化器从文件中或内存中读取下一个key-value数据的值,
	 * 		封装到之前OrderBean key-NullWritable nullWritable在reducer的属性中
	 */
	@Override
	protected void reduce(OrderBean key, Iterable<NullWritable> values,
			Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context)
			throws IOException, InterruptedException {
		
		Double maxAcount = key.getAcount();
		
		for (NullWritable nullWritable : values) {
			
			if (!key.getAcount().equals(maxAcount)) {
				break;
			}
			//复合条件的记录
			context.write(key, nullWritable);
			
		}
		
	}	
}

OrderBeanDriver.java

public class OrderBeanDriver {
	
	public static void main(String[] args) throws Exception {
		
		Path inputPath=new Path("E:\\mrinput\\groupcomparator");
		Path outputPath=new Path("e:/mroutput/groupcomparator");
		
		//作为整个Job的配置
		Configuration conf = new Configuration();
		
		//保证输出目录不存在
		FileSystem fs=FileSystem.get(conf);
		
		if (fs.exists(outputPath)) {
			
			fs.delete(outputPath, true);
			
		}
		
		// ①创建Job
		Job job = Job.getInstance(conf);
		
		// ②设置Job
		// 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
		job.setMapperClass(OrderMapper.class);
		job.setReducerClass(OrderReducer.class);
		
		// Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
		// 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型

		job.setOutputKeyClass(OrderBean.class);
		job.setOutputValueClass(NullWritable.class);
		
		// 设置输入目录和输出目录
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		
		// 设置自定义的分组比较器
		job.setGroupingComparatorClass(MyGroupingComparator2.class);
		
		// ③运行Job
		job.waitForCompletion(true);
		
	}
}

输出结果

MNbQV3N.png!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK