shuffle本意为混洗,MR将排完序的mapper输出作为reducer的输入的过程就称为shuffle,可以理解为mapper到reducer的中间过程,在这个过程中MR框架其实干了很多事。
map函数开始产生输出时(调用context.write()方法,底层是收集器的collector.collect()方法),它先写入到内存中的环形缓冲区(由MapOutputBuffer维护,默认大小为100M)中,缓冲区中存放着原始数据和元数据,考虑到效率会首先会根据分区、key值等对缓冲区内的元数据进行排序,默认是按照分区索引从小到大,key值从小到大排序,这是第一次排序。如果有一个combiner函数,它就会在排序后的输出上运行。
每当缓冲区达到了阈值80%,另一个SpillThread线程会将缓冲区内的数据溢写到磁盘,此时Mapper线程和Spill线程是并行的,如果在溢写的过程中,Mapper线程继续写入数据导致剩下的缓冲区会填满,Mapper线程会被阻塞直到Spill线程完成。写到磁盘即会创建一个溢出文件,因此mapper在写完所有数据后,当数据量大于80M时,可能会有多个小的溢出文件。因为缓冲区中是通过移动元数据实现原始数据的排序的,所以所谓排序其实是改变了原始数据溢写到磁盘的顺序。
在Mapper阶段的最后,多个小的溢写文件会被合并成一个大的溢写文件,并且该文件已经分区并排序了,此时是第二次排序。配置属性mapreduce.task.io.sort.factor控制着一次最多能合并多少流,默认为10。如果至少存在3个(由mapreduce.map.combine.minspills控制)小的溢写文件,那么combiner会在小的溢写文件写到大的溢写文件之前再次运行。通常会将mapper的输出进行压缩写到磁盘上,但默认是不进行压缩的,可以将mapreduce.map.output.cpmress设置为true,使用的压缩库有mapreduce.map.output.compress.codec指定。
一个reducer会通过http得到mapper阶段生成的最终的溢写文件中的一个分区的内容,所以说reducer的数量决定了分区数量。
Mapper中作为调用参数传给map()的context是在Mapper.run()受到调用的时候作为参数传下来的,而Mapper.run()是被MapTask.runNewMapper()调用的。
MapTask.run()中
//通过job对象获取是否使用的是新API boolean useNewApi = job.getUseNewMapper(); if (useNewApi) { runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); }MapTask.runNewMapper()中
//如果Reducer的数量为0,则创建用于直接输出的Collector if (job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { //不然则创建通往Reducer的Collector output = new NewOutputCollector(taskContext, job, umbilical, reporter); }在创建收集器NewOutputCollector时,同时创建了Mapper的输出RecordWriter,包括collector和partitioner。collector负责收集Mapper输出并将其交付给Reducer,partitioner负责决定应该将具体的输出交给哪一个Reducer。
NewOutputCollector()构造器中
//创建通向排序阶段的Collector collector = createSortingCollector(job, reporter); //Reducer的数量决定了分区数量 partitions = jobContext.getNumReduceTasks(); //如果分区数>1,加载默认的HashPartitioner分区器,或用户自定义的分区器,目的在于获取分区索引 if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { //分区数为1,新建一个分区索引统一返回0的分区器 partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() { @Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1; } }; }这里对createSortingCollector()的调用就是为了创建具体的收集器。MapTask中定义了一个界面MapOutputCollector<K,V>,凡是实现了此界面的类,除了init()和close()之外还需要提供collect()和flush()这两个方法,之所以称之为SortingCollector,因为它对收集到的kv对进行了排序。具体采用什么collector是通过mapreduce.job.map.output.collector.class决定的,默认为MapTask$MapOutputBuffer。createSortingCollector()中调用了 collector.init(context),而此collector即为MapOutputBuffer。
创建完收集器和分区器,接下来就是context对象了,继续回到MapTask.runNewMapper()中
//创建用于Mapper的Context org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output,committer, reporter, split); //将mapContext 再次封装,返回一个定义于WrappedMapper内部的mapContext,实际是MapContextImpl org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(mapContext); try { input.initialize(split, mapperContext); //Mapper中的run()再这里被调用 mapper.run(mapperContext); //此时map阶段已完成,进入sort阶段 mapPhase.complete(); setPhase(TaskStatus.Phase.SORT); }WrappedMapper与MapContextImpl不同的是,WrappedMapper内部定义了一个Context类,就是这个WrappedMapper.Context传递给了Mapper.run()。再WrappedMapper.Context中有getCurrentKey(),nextKeyValue(),write()等方法,其实都是通过RecordReader实现的。
可见Mapper的输出过程最关键的就是这个MapOutputBuffer收集器,关于MapOutputBuffer的概述可见:MapReduce:Mapper阶段的输出之MapOutputBuffer、环形缓冲区工作原理。这篇博客主要将了MapOutputBuffer.init()方法
关于Mapper的输出,先看Mapper.map()方法中的context.write(),也就是WrappedMapper.write(),最终落实到了MapOutputBuffer的collect()方法,以下为MapOutputBuffer输出,即spill的部分,其中重新计算下标,分隔点等的逻辑可参考环形缓冲区工作原理
//bufferRemaining初始为80M,METASIZE为16字节,此时写入一个kv对即产生一个元数据 bufferRemaining -= METASIZE; //剩余空间不足时,需要进行spill if (bufferRemaining <= 0) { if (!spillInProgress) { final int kvbidx = 4 * kvindex; final int kvbend = 4 * kvend; final int bUsed = distanceTo(kvbidx, bufindex); final boolean bufsoftlimit = bUsed >= softLimit; if ((kvbend + METASIZE) % kvbuffer.length != equator - (equator % METASIZE)) { // 溢写结束,进行回收内存空间,而上面的条件说明缓冲区中原来是数据的 resetSpill(); //重新计算环形缓冲区中的多个下标(kvindex,kvend等) } else if (bufsoftlimit && kvindex != kvend) { //启动Spill startSpill(); //重新计算下标 } } }接下来是写入到MapOutputBuffer的缓冲区的部分
try{ // 将key实例化并写入MapOutputBuffer中的kvbuffer中 int keystart = bufindex; keySerializer.serialize(key); // 序列化value并写入 final int valstart = bufindex; valSerializer.serialize(value); // 写出长度为0,为了检验边界条件(?) bb.write(b0, 0, 0); int valend = bb.markRecord(); // 写入元数据 kvmeta.put(kvindex + PARTITION, partition); kvmeta.put(kvindex + KEYSTART, keystart); kvmeta.put(kvindex + VALSTART, valstart); kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend)); // advance kvindex kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity(); }catch(MapBufferTooSmallException e) { //如果kv对太大,直接Spill出去 spillSingleRecord(key, value, partition); }也就是说写入到缓冲区kvbuffer是通过序列化器的serialize()方法实现的,而它的底层是WritableSerializer.serialize(Writable w),也就是说不同类型的数据,它们写入到缓冲区中的数据也是不同的,这关系到不同数据类型的write()方法。有趣的是Text.write()方法会先写入数据的长度,这就导致了本来key可能只有4B,但写入到kvbuffer中为5B。
接下来再关注Sort和Spill部分了,上面看到在bufferRemaining≤0时,会通过startSpill()调用spillReady.signal()以此唤醒SpillThread线程。 缓冲区中的数据是通过MapOutpurBuffer.flush()方法冲刷到溢写文件的(调用了sortAndSpill()),并在此方法中实现了小溢写文件合并成一个大的溢写文件的过程(调用了mergeParts())。当缓冲区中的数据达到了临界值或是缓存区已经写入完所有数据虽然不满足spill条件也会被最后冲刷出去。总之最后只要缓冲区非空,就要再来一次sortAndSpill()
MapOutpurBuffer.sortAndSpill()中
//计算环形缓冲区中数据的长度,并加上每一个分区的头长度150字节 final long size = distanceTo(bufstart, bufend, bufvoid) + partitions * APPROX_HEADER_LENGTH; //用于创建溢写文件的数据流 FSDataOutputStream out = null; try { //溢写索引块,多次Spill需要记录上一次溢写的相关索引 final SpillRecord spillRec = new SpillRecord(partitions) // 创建小的溢写文件 final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size); out = rfs.create(filename); //计算共有几项元数据 final int mstart = kvend / NMETA; //计算第一项元数据在整个缓冲区中的下标 final int mend = 1 + // kvend is a valid record(kvstart >= kvend? kvstart: kvmeta.capacity() + kvstart) / NMETA; //对元数据进行排序,底层为QuickSort.sort()。按照分区索引从小到大,key值从小到大排序 sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); for (int i = 0; i < partitions; ++i) { //接下来就是对每一个分区进行写入到溢写文件中的操作,其中会判断是否需要加密和是否运行combiner } }QuickSort这一类的排序算法的实现都是通用的,具体如何排序取决于两个回调函数compare()和swap()。这里sort()的第一个参数是MapOutputBuffer.this,也就是调用了MapOutputBuffer.compare()和MapOutputBuffer.swap():
public int compare(final int mi, final int mj) { final int kvi = offsetFor(mi % maxRec); final int kvj = offsetFor(mj % maxRec); final int kvip = kvmeta.get(kvi + PARTITION); final int kvjp = kvmeta.get(kvj + PARTITION); // 先比较分区索引,若两个Partition不同,则已经比出了先后 if (kvip != kvjp) { return kvip - kvjp; } // 若分区索引相同则比较key的内容 return comparator.compare(kvbuffer, kvmeta.get(kvi + KEYSTART), //key的起点 kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART), //key的长度 kvbuffer, //缓冲区的起点 kvmeta.get(kvj + KEYSTART), kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART)); }比较滞后,如果有需要交换两项元数据的位置,就调用swap()
此时排序和Spill已经完成。此外如果有需要有Combiner,还可以在逐项写出kv对时插入Combiner环节。最后在Spill过程中会产生一个索引块,如果索引块太大,还需要创建并写入Spill索引文件,否则索引信息留在内存中
if (totalIndexCacheMemory >= indexCacheMemoryLimit) { // 创建spill索引文件 Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { // 否则索引信息留在内存中 indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } ++numSpills; }只要Mapper还在运行即有数据输出,collect()和sortAndSpill()就会并存,知道Mapper处理完所有输入的数据
当Mapper中不再有数据产出时,Mapper.run()中的while循环会因为调用context.nextKeyValue()返回false而结束,于是又回到了runNewMapper()中。此时会关闭输入和输出通道。也就意味中会关闭Collector,也就是MapOutputBuffer,关闭前需要冲刷一下。当缓冲区已空,即所有数据都已在小的溢写文件中,接下来就需要把所有spill文件的内容合并到单个大的溢写文件中,以备分发给各个Reducer。
MapOutputBuffer.mergeParts():
//每次Spill都有个小的溢写文件,所以数组大小为spill的次数 final Path[] filename = new Path[numSpills]; //统计所有这些文件合并后的大小 for(int i = 0; i < numSpills; i++) { filename[i] = mapOutputFile.getSpillFile(i); finalOutFileSize += rfs.getFileStatus(filename[i]).getLen(); } //如果只有一次Spill if (numSpills == 1) { //直接更名,在原名上加上后缀file.out sameVolRename(filename[0], mapOutputFile.getOutputFileForWriteInVolume(filename[0])); //索引块缓存已空 if (indexCacheList.size() == 0) { //spill索引文件改名 sameVolRename(mapOutputFile.getSpillIndexFile(0), mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0])); //索引块缓存中还有索引记录,需要写到索引文件中 } else { indexCacheList.get(0).writeToFile( mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job); } //sort阶段结束 sortPhase.complete(); return; } //Spill文件数>1,需要进行合并 for (int i = indexCacheList.size(); i < numSpills; ++i) { Path indexFileName = mapOutputFile.getSpillIndexFile(i); //先将所有spill索引文件收集在一起 indexCacheList.add(new SpillRecord(indexFileName, job)); } //最终spill文件 Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize); //最终索引文件 Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize); //创建通向最终溢写文件的输出流 FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); if (numSpills == 0) { //如果没有生成溢写文件,也要创建空文件 } //sort阶段按照分区数划分子阶段,所以最终的大的溢写文件也是按照分区和key值排完序的 sortPhase.addPhases(partitions); //对于每一个分区 for (int parts = 0; parts < partitions; parts++) { //创建片段列表存放每个spill文件的信息以便最终合并 List<Segment<K,V>> segmentList = new ArrayList<Segment<K, V>>(numSpills); //合并所有spill文件中的当前分区 for(int i = 0; i < numSpills; i++) { IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); Segment<K,V> s = new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset, indexRecord.partLength, codec, true); segmentList.add(i, s); } //同时可以对100个文件进行合并 int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100); //仅当存在中间合并时,才对段进行排序 boolean sortSegments = segmentList.size() > mergeFactor; //合并同一分区在所有spill文件中的内容,可能会需要进行sort,合并结果是序列kv对迭代器 RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, codec, segmentList, mergeFactor, new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, sortSegments, null, spilledRecordsCounter, sortPhase.phase(), TaskType.MAP); //接下来将kv迭代器中的内容写到磁盘上 //当无combiner或spill文件数<3(minSpillsForCombine)时,由属性MAP_COMBINE_MIN_SPILLS控制 if (combinerRunner == null || numSpills < minSpillsForCombine) { //将合并内容直接写入文件 Merger.writeFile(kvIter, writer, reporter, job); } else { //将合并内容经过combiner后写入文件 combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector); } writer.close(); //进入下一个子阶段,也就是写入写一个分区 sortPhase.startNextPhase(); //最后写入到索引文件 spillRec.writeToFile(finalIndexFile, job); //删除所有小的spill文件 for(int i = 0; i < numSpills; i++) { rfs.delete(filename[i],true); }最终的溢写文件和之前单个溢写文件的格局是一样的,按照分区分成若干区段,每个分区都有个头部,然后就是排序好的kv对。这个Merge操作结合原先为各个spill文件进行的Sort,构成了MergeSort,不过这一阶段的MergeSort是针对同一Mapper的多个spill文件。而在Reducer阶段的merge是针对多个Mapper的MergeSort。