Map的结果,会通过partition分发到Reducer上,reducer操作过后会进行输出。输出的文件格式后缀000001就代表1分区。
Mapper处理过后的键值对,是需要送到Reducer那边进行合并,具有相同的key的键值对会送到同一个Reducer上面。哪个key到哪个Reducer的分配过程,是由Partition决定的
里面只有一个方法getPartition()
@Public@Stablepublic abstract class Partitioner{ public Partitioner() { } public abstract int getPartition(KEY var1, VALUE var2, int var3);}
输入(形参)是Map的结果对<key, value>和reducerTask的数目,输出(返回值)则是分配的Reducer(整数编号)。
就是指定某个Mapper输出的键值对到哪一个reducer上去。
系统缺省的Partitioner是HashPartitioner,它的实现是以key的hashcode对reducer的数值取模,得到对应的Reducer。这样就保证了相同的key值,分配到了同一个Reducer上。编号不大于指定的reducerTasks,0,1,2······(n-1)。
job.setPartitionerClass(JournalDataPartitioner.class);
job.setNumReduceTasks(CollectionUtils.isEmpty(branchIds) ? 3 : branchIds.size() + 1);
partition类
private static class JournalDataPartitioner extends Partitioner{ @Override public int getPartition(Text key, JournalTrxDataSet value, int arg2) { if (!CollectionUtils.isEmpty(branchIds)){ for (int i = 0; i < branchIds.size(); i++) { if (branchIds.get(i).equals(value.getBranchId())){ log.info(">>>>>> i = {}", i); return i + 1; } } return 0; }else { if ("706010101".equals(value.getBranchId())) { return 1; } else if ("706010106".equals(value.getBranchId())) { return 2; } return 0; } } }