使用背景:
按照mapreduce框架,每个Reducer输入的(key, value)按照key排序,输出结果也按照key排序。然而,对于多个Reducer情况,这些Reducer之间的输出结果并不是全局有序(各Reducer输出结果之间没有排序,仅是每个Reducer内输出结果有序)。这导致mapreduce结果无法直接合并处理。比如,对于2个Reducer输出结果:
– Reducer1 output: (a, 1), (d, 4), (w, 9)
– Reducer2 output: (b, 2), (c, 5), (e, 7)
这两个Reducer输出结果没有按照key排序。为了实现Reducer输出结果全局有序,可以使用如下方式:
– 人工指定(key, value)送给哪个partition。对于上面例子,可以指定如下Partitioner,
public class CustomPartitioner extends Partitioner<CompositeKey, DonationWritable> { @Override public int getPartition(CompositeKey key, DonationWritable value, int numPartitions) { if (key.compareTo("d") < 0) { return 0; } else if (key.state.compareTo("d") >= 0) { return 1; } } }
人工指定partition,好处在于实现简单,可以快速达到全局有序要求;但是可能存在严重数据倾斜问题(data skew)。同时这种方法需要完全了解数据分布,如果数据发生改变,需要按照数据内容更改代码,适用性不强。
– 使用Total Order Partitioner:使用Total Order Partitioner,实现输出结果在多个Reducer之间全局有序。
思考:为什么需要使用Total Order Partitioner?如果按照mapreduce默认设置,使用1个Reducer,可能导致大量数据由这个Reducer处理,任务执行时间过长,达不到分布式处理的效果。
Total Order Partitioner工作流
使用Doantion数据集说明Total Order Partitioner工作流。
1、在开始Map之前,Mapreduce首先执行InputSampler对样本抽样,并生成partition file写入HDFS。InputSampler对输入split进行抽样,并使用sortComparator对抽样结果进行排序。常用抽样方法有:
– RandomSampler:按照给定频次,进行随机抽样。
– IntervalSampler:按照给定间隔,进行定间隔抽样。
– SplitSampler:取每个split的前n个样本进行抽样。
2、InputSampler在HDFS上写入一个partition file(sequence file),决定不同分区的key边界。对于n个Reducer,partition file有n-1个边界数据。Map的output按照partition file的边界不同,分别写入对应的分区。
3、Mapper使用TotalOrderPartitioner类读取partition file,获得每个Mapper使用TotalOrderPartitioner类。这个类读取partition file,确定每个分区的边界。
4、在shuffle阶段,每个Reducer会拉取对应分区中已排序的(key, value)。由于每个分区已按照partition file设置边界,这样分区1中的数据都比分区2小,分区2数据都比分区3小(假设升序排列)。
5、Reducer处理对应分区数据并写入HDFS后,输出数据也保持全局有序。
Total Order Partitioner简单实现
下面是最简单的Total Order Partitioner实现,程序仅有main(),没有自定义Mapper、Reducer。说明如下:
1)使用RandomSampler进行样本抽样。
– freq=0.01。抽样器采用0.01%(1%)从样本记录中进行抽样。
– numSamplers=1000。抽样器最多取1000个样本。如果抽样样本数达到上限,则新样本会覆盖已有样本。
– maxSplitsSampled=100。抽样器最多抽样100个splts。
2)InputSamer.writePartitionFile()写入HDFS。需要提前完成如下配置,
– 任务输入路径:通过FileInputFormat.SetInputPaths(),设定任务的输入数据路径,供抽样器进行抽样。
– Reduce任务数量:确定写入partition file的边界数量(即Reduce任务数量减1)。
– Map的OutputKey类型:Map InputKey类型需要与Map OutputKey类型保持一致!
public class TotalOrderPartitionerExample { public static void main(String[] args) throws Exception { // Create job and parse CLI parameters Job job = Job.getInstance(new Configuration(), "Total Order Sorting example"); job.setJarByClass(TotalOrderPartitionerExample.class); Path inputPath = new Path(args[0]); Path partitionOutputPath = new Path(args[1]); Path outputPath = new Path(args[2]); // The following instructions should be executed before writing the partition file job.setNumReduceTasks(3); FileInputFormat.setInputPaths(job, inputPath); <strong>TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionOutputPath);</strong> job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapOutputKeyClass(Text.class); // Write partition file with random sampler <strong>InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 1000, 100); InputSampler.writePartitionFile(job, sampler);</strong> // Use TotalOrderPartitioner and default identity mapper and reducer job.setPartitionerClass(TotalOrderPartitioner.class); job.setMapperClass(Mapper.class); job.setReducerClass(Reducer.class); FileOutputFormat.setOutputPath(job, outputPath); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Total Order partitioner注意事项
1、InputSampler需要Map InputKey与Map OutputKey保持一致。
2、InputSampler需要在setMapOutputKeyClass()之后,再设置InputSampler.writePartitionFile()。
否则,程序会出错误”java.io.IOException: wrong key class”。
The InputSampler expects us to keep the same keys between the input and the output of the map function. This way the sampled data is guaranteed to reflect the map output data, by having the same data type and distribution.
二次排序的Total Order Partitioner实现
对于二次排序,由于自定义了Map OutputKey进行排序,这可能导致Map InputKey与OutputKey不同,无法使用Total Order partitioner。为了在二次排序中使用Total Order partitiner,需要使用自定义InputFormat,在InputFormat中实现自定义key,并确保在Map中不进行类别修改。具体如下:
1、自定义custom InputFormat。按照二次排序的key,生成Map InputKey。
public class Doninputformat extends FileInputFormat<Donator, NullWritable> { public static class Donrecordreader extends RecordReader<Donator, NullWritable> { long start,pos,end; Text line=new Text(); LineReader in; Donator outputKey=new Donator(); NullWritable outputValue=NullWritable.get(); @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit filesplit=(FileSplit)split; Configuration conf=context.getConfiguration(); Path filepath=filesplit.getPath(); FSDataInputStream fs=filepath.getFileSystem(conf).open(filepath); in=new LineReader(fs,conf); start=filesplit.getStart(); end=start+filesplit.getLength(); fs.seek(start); if(start!=0) start+=in.readLine(new Text(), 0, (int)Math.min(Integer.MIN_VALUE, end-start)); pos=start; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub if(pos>end) return false; pos+=in.readLine(line); if(line.getLength()==0 && pos==end) return false; String []words=line.toString().split("\",\""); if(words[0].equals("_donationid") ||words[4].isEmpty() ||words[5].isEmpty() ||words[11].isEmpty()) pos+=in.readLine(line); if(line.getLength()==0 && pos==end) return false; String []words2=line.toString().split("\",\""); outputKey.setCity(words2[4]); outputKey.setState(words2[5]); outputKey.setDonationid(words2[0].replace("\"", "")); outputKey.setTotal(Double.parseDouble(words2[11])); return true; } @Override public Donator getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return outputKey; } @Override public NullWritable getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return outputValue; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public void close() throws IOException { // TODO Auto-generated method stub in.close(); } } @Override public RecordReader<Donator, NullWritable> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub return new Donrecordreader(); } }
2、设置Mapper
由于Mapper的主要功能已经在inputFormat实现,在mapreduce中使用默认Mapper即可。它将会把每个map (inputKey, inputValue)分区写入shuffle。
while(context.nextKeyValue()){ //context.nextKeyValue()返回下一个key/value是否存在。如果存在,返回true,并设置好下一个key/value map(context.getCurrentKey(), context.getCurrentValue(), context); //把下一个key/value传递给map() } cleanup(context); //map()结束 }
2024最新激活全家桶教程,稳定运行到2099年,请移步至置顶文章:https://sigusoft.com/99576.html
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。 文章由激活谷谷主-小谷整理,转载请注明出处:https://sigusoft.com/117126.html