《MapReduce 4》--自定义分区、shuffle技术、环形缓冲区(MapOutputBuffer源码解析)、Maptask源码解析

    xiaoxiao2022-07-03  106

    如何自定义分区:

        原理:int getPartition(LongWritable key, Text value, int numPartitions)                此方法的返回值为分区的索引(分区就是由索引作为唯一标识符),                该方法确定当前key/value属于哪个索引对应的分区。                由于 if (partition < 0 || partition >= partitions) {                            throw new IOException("Illegal partition for " + key + " (" + partition + ")");                        }                上述代码,可见,自定义分区索引时的范围为:                 0 ~ partitions==job.getNumReduceTasks()-1;

    shuffle(混洗):

       

         说明:shuffle技术是MR的核心技术                 shuffle阶段:从map输出开始,到reduce输入之前这个阶段。             流程:          map输出-->      -->缓存区(buffer,默认100M)      -->partition(在buffer标记)      -->sort(将kv对的元数据进行排序,依然是在buffer内进行)      -->spill(条件为buffer中的数据达到80%,即0.8时,开始溢写)到本地磁盘,                  产生溢写文件,                  将同一分区的数据聚集(此时数据是排序状态)      -->n个溢写文件merge成一个文件      -->fetch(reduce端的线程通过Http协议复制当前reduceTask要处理的分区数据,                    先复制到缓存,                    再根据缓存大小来决定是否产生文件)      -->merge(reduce端会合并多个文件,                      最后一次合并不产生文件,直接在内存中输入reduce)

    环形缓冲区:【MapOutputBuffer】的源码解析

    属性:     private int partitions;      private Class<!-- <K> --> keyClass;  //在job中定义的map输出端的key类型     private Class<!-- <V> --> valClass;  //在job中定义的map输出端的value类型

        private Serializer<!-- <K> --> keySerializer; //用来序列号key值,存储到buffer里     private Serializer<!-- <V> --> valSerializer; //用来序列化value值,存储到buffer里

        // k/v accounting     //参考init方法内的赋值操作,实际上是对buffer进行包装成int类型的数组,提供存储元数据的界面     private IntBuffer kvmeta;      //用来标记元数据的起始位置 (第一次的位置应该是倒数第四个int元素位置)     int kvstart;      int kvend;              // 元数据在int数组的结束位置     int kvindex;            // 存储下一个元数据的起始位置

        int equator;            // 分隔点的位置     int bufstart;           // kv对的起始位置     int bufend;             // kv对的结束位置     int bufmark;            // kv对的结束位置的标记     int bufindex;           // 下一个kv对的开始位置     int bufvoid;            // 缓冲区的长度                            

        byte[] kvbuffer;        // 环形缓冲区     private final byte[] b0 = new byte[0];    //校验是否到边界

        //一个元数据由四部分组成[valstart,keystart,parition,vallen]     ,四部分统计的是kv对的信息     private static final int VALSTART = 0;  // valstart相对于kvindex的偏移量     private static final int KEYSTART = 1;  // keystart元数据相对于kvindex的偏移量     private static final int PARTITION = 2; // parition元数据相对于kvindex的偏移量     private static final int VALLEN = 3;    // vallen元数据相对于kvindex的偏移量     private static final int NMETA = 4;     // 一个元数据占4个int     private static final int METASIZE = NMETA * 4; // 统计一个元数据实际所占字节个数

        final SpillThread spillThread = new SpillThread();//溢写线程

        private FileSystem rfs;//溢写文件系统      

    //初始化函数  public void init( MapOutputCollector.Context context                     ) throws IOException, ClassNotFoundException {            mapOutputFile = mapTask.getMapOutputFile();  //此对象决定map输出文件的位置       sortPhase = mapTask.getSortPhase();  //设置排序阶段              partitions = job.getNumReduceTasks();  //获取分区个数             //溢写文件或最后的合并文件存储到本地       rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();

          //获取溢写阈值        final float spillper =job.getFloat(                     JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);       //获取指定的缓冲区大小,如果没有指定,使用默认值100       final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);              //在排序阶段默认使用快排方式       sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",                                                                    QuickSort.class, IndexedSorter.class), job);            int maxMemUsage = sortmb << 20;   //  将缓冲区的单位转成MB       maxMemUsage -= maxMemUsage % METASIZE;   //保证maxMenUsage是16个倍数       kvbuffer = new byte[maxMemUsage];  // 创建环形缓冲区       bufvoid = kvbuffer.length;       //将字节数组buffer 转成int数组视图       kvmeta = ByteBuffer.wrap(kvbuffer)                .order(ByteOrder.nativeOrder())                .asIntBuffer();       setEquator(0);//  定义分隔点,同时给kvindex进行赋值       bufstart = bufend = bufindex = equator;       kvstart = kvend = kvindex;

               // k/v serialization       comparator = job.getOutputKeyComparator();       keyClass = (Class<K>)job.getMapOutputKeyClass();       valClass = (Class<V>)job.getMapOutputValueClass();       serializationFactory = new SerializationFactory(job);       keySerializer = serializationFactory.getSerializer(keyClass);       keySerializer.open(bb);       valSerializer = serializationFactory.getSerializer(valClass);       valSerializer.open(bb);

          spillInProgress = false;       minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);       spillThread.setDaemon(true);       spillThread.setName("SpillThread");       spillLock.lock();       try {         spillThread.start();    //启动溢写线程         while (!spillThreadRunning) {   //进行轮询的方式询问           spillDone.await();        }

    //收集函数:往环形缓冲区里存储kv对 public synchronized void collect(K key, V value, final int partition                                      ) throws IOException {       bufferRemaining -= METASIZE;// 80MB       if (bufferRemaining <= 0) {  如果不小于0,不用溢写          // start spill if the thread is not running and the soft limit has been         // reached         spillLock.lock();         try {           do {             if (!spillInProgress) {               final int kvbidx = 4 * kvindex;               final int kvbend = 4 * kvend;               // serialized, unspilled bytes always lie between kvindex and               // bufindex, crossing the equator. Note that any void space               // created by a reset must be included in "used" bytes               final int bUsed = distanceTo(kvbidx, bufindex);               final boolean bufsoftlimit = bUsed >= softLimit;               if ((kvbend + METASIZE) % kvbuffer.length !=                   equator - (equator % METASIZE)) {                 // spill finished, reclaim space                 resetSpill();                 bufferRemaining = Math.min(                     distanceTo(bufindex, kvbidx) - 2 * METASIZE,                     softLimit - bUsed) - METASIZE;                 continue;               } else if (bufsoftlimit && kvindex != kvend) {                 // spill records, if any collected; check latter, as it may                 // be possible for metadata alignment to hit spill pcnt                 startSpill();                 final int avgRec = (int)                   (mapOutputByteCounter.getCounter() /                   mapOutputRecordCounter.getCounter());                 // leave at least half the split buffer for serialization data                 // ensure that kvindex >= bufindex                 final int distkvi = distanceTo(bufindex, kvbidx);                 final int newPos = (bufindex +                   Math.max(2 * METASIZE - 1,                           Math.min(distkvi / 2,                                    distkvi / (METASIZE + avgRec) * METASIZE)))                   % kvbuffer.length;                 setEquator(newPos);                 bufmark = bufindex = newPos;                 final int serBound = 4 * kvend;                 // bytes remaining before the lock must be held and limits                 // checked is the minimum of three arcs: the metadata space, the                 // serialization space, and the soft limit                 bufferRemaining = Math.min(                     // metadata max                     distanceTo(bufend, newPos),                     Math.min(                       // serialization max                       distanceTo(newPos, serBound),                       // soft limit                       softLimit)) - 2 * METASIZE;               }             }           } while (false);         } finally {           spillLock.unlock();         }       }             //不溢写,收集kv对和kvmeta数据       try {         // serialize key bytes into buffer         int keystart = bufindex;     //序列化key值,写入buffer中,bufindex值改变     keySerializer.serialize(key);         if (bufindex < keystart) {           // wrapped the key; must make contiguous           bb.shiftBufferedKey();           keystart = 0;         }         // serialize value bytes into buffer         final int valstart = bufindex;     //序列化value值,写入buffer中,bufindex值改变     valSerializer.serialize(value);         // It's possible for records to have zero length, i.e. the serializer         // will perform no writes. To ensure that the boundary conditions are         // checked and that the kvindex invariant is maintained, perform a         // zero-length write into the buffer. The logic monitoring this could be         // moved into collect, but this is cleaner and inexpensive. For now, it         // is acceptable.         bb.write(b0, 0, 0);

            // the record must be marked after the preceding write, as the metadata         // for this record are not yet written         int valend = bb.markRecord();//对kvindex进行标记

            mapOutputRecordCounter.increment(1);//计数器         mapOutputByteCounter.increment(             distanceTo(keystart, valend, bufvoid));//计数:kv对的字节长度

            // 写kv对的元数据信息         kvmeta.put(kvindex + PARTITION, partition);         kvmeta.put(kvindex + KEYSTART, keystart);         kvmeta.put(kvindex + VALSTART, valstart);         kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));         // 移动kvindex,为下一个kvmeta做准备,移动了int数组的四个位置         kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();       } catch (MapBufferTooSmallException e) {         LOG.info("Record too large for in-memory buffer: " + e.getMessage());     //当前kv对较大(针对于80MB),会直接溢写到磁盘         spillSingleRecord(key, value, partition);         mapOutputRecordCounter.increment(1);         return;       }     }

    MapTask源码解析:

    MapTask.run()-->     -->if (conf.getNumReduceTasks() == 0)          说明:获取reduceTask的个数:                                                                                                                                                                    如果是0,将map阶段设置成100%                     如果有reduceTask,将mapPhase设置(67%),sortPhase设置(33%)     -->  boolean useNewApi = job.getUseNewMapper();            说明:是否使用了新的api     -->initialize(job, getJobID(), reporter, useNewApi);            说明:修改运行状态为running,获取输出格式(TextOutputFormat)                       获取输出路径     --> runNewMapper(job, splitMetaInfo, umbilical, reporter);     -->说明:获取任务上下文对象,获取实际Mapper对象,获取输入格式,                    构造当前任务的inputsplit以及记录阅读器                  (使用NewTrackingRecordReader对RecordReader)                    获取map输出收集器(实际上是一个RecordWriter,写到MapOutputBuffer对象)

                       将输入格式,记录写出器,分片信息等封装成MapContextImpl对象                    再将MapContextImpl包装成WrappedMapper类型     -->input.initialize(split, mapperContext);(略)     -->mapper.run(mapperContext);                    说明:运行Mapper里的run方法(实际上:参数对象为MapContextImpl)

     

    最新回复(0)