原理:int getPartition(LongWritable key, Text value, int numPartitions) 此方法的返回值为分区的索引(分区就是由索引作为唯一标识符), 该方法确定当前key/value属于哪个索引对应的分区。 由于 if (partition < 0 || partition >= partitions) { throw new IOException("Illegal partition for " + key + " (" + partition + ")"); } 上述代码,可见,自定义分区索引时的范围为: 0 ~ partitions==job.getNumReduceTasks()-1;
说明:shuffle技术是MR的核心技术 shuffle阶段:从map输出开始,到reduce输入之前这个阶段。 流程: map输出--> -->缓存区(buffer,默认100M) -->partition(在buffer标记) -->sort(将kv对的元数据进行排序,依然是在buffer内进行) -->spill(条件为buffer中的数据达到80%,即0.8时,开始溢写)到本地磁盘, 产生溢写文件, 将同一分区的数据聚集(此时数据是排序状态) -->n个溢写文件merge成一个文件 -->fetch(reduce端的线程通过Http协议复制当前reduceTask要处理的分区数据, 先复制到缓存, 再根据缓存大小来决定是否产生文件) -->merge(reduce端会合并多个文件, 最后一次合并不产生文件,直接在内存中输入reduce)
属性: private int partitions; private Class<!-- <K> --> keyClass; //在job中定义的map输出端的key类型 private Class<!-- <V> --> valClass; //在job中定义的map输出端的value类型
private Serializer<!-- <K> --> keySerializer; //用来序列号key值,存储到buffer里 private Serializer<!-- <V> --> valSerializer; //用来序列化value值,存储到buffer里
// k/v accounting //参考init方法内的赋值操作,实际上是对buffer进行包装成int类型的数组,提供存储元数据的界面 private IntBuffer kvmeta; //用来标记元数据的起始位置 (第一次的位置应该是倒数第四个int元素位置) int kvstart; int kvend; // 元数据在int数组的结束位置 int kvindex; // 存储下一个元数据的起始位置
int equator; // 分隔点的位置 int bufstart; // kv对的起始位置 int bufend; // kv对的结束位置 int bufmark; // kv对的结束位置的标记 int bufindex; // 下一个kv对的开始位置 int bufvoid; // 缓冲区的长度
byte[] kvbuffer; // 环形缓冲区 private final byte[] b0 = new byte[0]; //校验是否到边界
//一个元数据由四部分组成[valstart,keystart,parition,vallen] ,四部分统计的是kv对的信息 private static final int VALSTART = 0; // valstart相对于kvindex的偏移量 private static final int KEYSTART = 1; // keystart元数据相对于kvindex的偏移量 private static final int PARTITION = 2; // parition元数据相对于kvindex的偏移量 private static final int VALLEN = 3; // vallen元数据相对于kvindex的偏移量 private static final int NMETA = 4; // 一个元数据占4个int private static final int METASIZE = NMETA * 4; // 统计一个元数据实际所占字节个数
final SpillThread spillThread = new SpillThread();//溢写线程
private FileSystem rfs;//溢写文件系统
//初始化函数 public void init( MapOutputCollector.Context context ) throws IOException, ClassNotFoundException { mapOutputFile = mapTask.getMapOutputFile(); //此对象决定map输出文件的位置 sortPhase = mapTask.getSortPhase(); //设置排序阶段 partitions = job.getNumReduceTasks(); //获取分区个数 //溢写文件或最后的合并文件存储到本地 rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
//获取溢写阈值 final float spillper =job.getFloat( JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); //获取指定的缓冲区大小,如果没有指定,使用默认值100 final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); //在排序阶段默认使用快排方式 sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job); int maxMemUsage = sortmb << 20; // 将缓冲区的单位转成MB maxMemUsage -= maxMemUsage % METASIZE; //保证maxMenUsage是16个倍数 kvbuffer = new byte[maxMemUsage]; // 创建环形缓冲区 bufvoid = kvbuffer.length; //将字节数组buffer 转成int数组视图 kvmeta = ByteBuffer.wrap(kvbuffer) .order(ByteOrder.nativeOrder()) .asIntBuffer(); setEquator(0);// 定义分隔点,同时给kvindex进行赋值 bufstart = bufend = bufindex = equator; kvstart = kvend = kvindex;
// k/v serialization comparator = job.getOutputKeyComparator(); keyClass = (Class<K>)job.getMapOutputKeyClass(); valClass = (Class<V>)job.getMapOutputValueClass(); serializationFactory = new SerializationFactory(job); keySerializer = serializationFactory.getSerializer(keyClass); keySerializer.open(bb); valSerializer = serializationFactory.getSerializer(valClass); valSerializer.open(bb);
spillInProgress = false; minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3); spillThread.setDaemon(true); spillThread.setName("SpillThread"); spillLock.lock(); try { spillThread.start(); //启动溢写线程 while (!spillThreadRunning) { //进行轮询的方式询问 spillDone.await(); }
//收集函数:往环形缓冲区里存储kv对 public synchronized void collect(K key, V value, final int partition ) throws IOException { bufferRemaining -= METASIZE;// 80MB if (bufferRemaining <= 0) { 如果不小于0,不用溢写 // start spill if the thread is not running and the soft limit has been // reached spillLock.lock(); try { do { if (!spillInProgress) { final int kvbidx = 4 * kvindex; final int kvbend = 4 * kvend; // serialized, unspilled bytes always lie between kvindex and // bufindex, crossing the equator. Note that any void space // created by a reset must be included in "used" bytes final int bUsed = distanceTo(kvbidx, bufindex); final boolean bufsoftlimit = bUsed >= softLimit; if ((kvbend + METASIZE) % kvbuffer.length != equator - (equator % METASIZE)) { // spill finished, reclaim space resetSpill(); bufferRemaining = Math.min( distanceTo(bufindex, kvbidx) - 2 * METASIZE, softLimit - bUsed) - METASIZE; continue; } else if (bufsoftlimit && kvindex != kvend) { // spill records, if any collected; check latter, as it may // be possible for metadata alignment to hit spill pcnt startSpill(); final int avgRec = (int) (mapOutputByteCounter.getCounter() / mapOutputRecordCounter.getCounter()); // leave at least half the split buffer for serialization data // ensure that kvindex >= bufindex final int distkvi = distanceTo(bufindex, kvbidx); final int newPos = (bufindex + Math.max(2 * METASIZE - 1, Math.min(distkvi / 2, distkvi / (METASIZE + avgRec) * METASIZE))) % kvbuffer.length; setEquator(newPos); bufmark = bufindex = newPos; final int serBound = 4 * kvend; // bytes remaining before the lock must be held and limits // checked is the minimum of three arcs: the metadata space, the // serialization space, and the soft limit bufferRemaining = Math.min( // metadata max distanceTo(bufend, newPos), Math.min( // serialization max distanceTo(newPos, serBound), // soft limit softLimit)) - 2 * METASIZE; } } } while (false); } finally { spillLock.unlock(); } } //不溢写,收集kv对和kvmeta数据 try { // serialize key bytes into buffer int keystart = bufindex; //序列化key值,写入buffer中,bufindex值改变 keySerializer.serialize(key); if (bufindex < keystart) { // wrapped the key; must make contiguous bb.shiftBufferedKey(); keystart = 0; } // serialize value bytes into buffer final int valstart = bufindex; //序列化value值,写入buffer中,bufindex值改变 valSerializer.serialize(value); // It's possible for records to have zero length, i.e. the serializer // will perform no writes. To ensure that the boundary conditions are // checked and that the kvindex invariant is maintained, perform a // zero-length write into the buffer. The logic monitoring this could be // moved into collect, but this is cleaner and inexpensive. For now, it // is acceptable. bb.write(b0, 0, 0);
// the record must be marked after the preceding write, as the metadata // for this record are not yet written int valend = bb.markRecord();//对kvindex进行标记
mapOutputRecordCounter.increment(1);//计数器 mapOutputByteCounter.increment( distanceTo(keystart, valend, bufvoid));//计数:kv对的字节长度
// 写kv对的元数据信息 kvmeta.put(kvindex + PARTITION, partition); kvmeta.put(kvindex + KEYSTART, keystart); kvmeta.put(kvindex + VALSTART, valstart); kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend)); // 移动kvindex,为下一个kvmeta做准备,移动了int数组的四个位置 kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity(); } catch (MapBufferTooSmallException e) { LOG.info("Record too large for in-memory buffer: " + e.getMessage()); //当前kv对较大(针对于80MB),会直接溢写到磁盘 spillSingleRecord(key, value, partition); mapOutputRecordCounter.increment(1); return; } }
MapTask.run()--> -->if (conf.getNumReduceTasks() == 0) 说明:获取reduceTask的个数: 如果是0,将map阶段设置成100% 如果有reduceTask,将mapPhase设置(67%),sortPhase设置(33%) --> boolean useNewApi = job.getUseNewMapper(); 说明:是否使用了新的api -->initialize(job, getJobID(), reporter, useNewApi); 说明:修改运行状态为running,获取输出格式(TextOutputFormat) 获取输出路径 --> runNewMapper(job, splitMetaInfo, umbilical, reporter); -->说明:获取任务上下文对象,获取实际Mapper对象,获取输入格式, 构造当前任务的inputsplit以及记录阅读器 (使用NewTrackingRecordReader对RecordReader) 获取map输出收集器(实际上是一个RecordWriter,写到MapOutputBuffer对象)
将输入格式,记录写出器,分片信息等封装成MapContextImpl对象 再将MapContextImpl包装成WrappedMapper类型 -->input.initialize(split, mapperContext);(略) -->mapper.run(mapperContext); 说明:运行Mapper里的run方法(实际上:参数对象为MapContextImpl)