《MapReduce 5》--Map输出过程的跟踪、sortAndSpill()、mergeparts()

    xiaoxiao2022-07-04  102

    1、Map输出跟踪:

      以年份温度的mr程序为示例,进行分析:                                                                                                                                        【MyMapper.class】context.write(yearT, temperature);//将数据写出                    -->【WrappedMapper.class】write(KEYOUT key, VALUEOUT value)      -->【TaskInputOutputContextImpl.class】write(KEYOUT key, VALUEOUT value)        -->【NewOutputCollector.class】write(K key, V value)                          说明:构建NewOutputCollector对象时,调用构造器                           //给属性MapOutputCollector赋值MapOutputBuffer对象                           collector = createSortingCollector(job, reporter);                           //获取分区数                           partitions = jobContext.getNumReduceTasks();                                       如果分区数>1:                                  分区器,通过反射机制获取。参考如下:                                  ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job)                                【conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class)】                                  获取默认的HashPartitioner分区器                                  用户自定义的话,会从job身上获取自定义的分区器

                             如果分区数是1:(默认情况)                                 不会采用MR提供的默认的分区器(如hashPartitioner),                                 而是使用匿名内部类                                 创建一个分区器,返回索引0.

                             如果分区数是0:                                  不走shuffle,涉及不到分区情况。

           -->【MapOutputBuffer.class】collect(K key, V value, final int partition)                     说明:收集器调用collect方法,将kv对存储在环形缓冲区中                                在收集map输出数据时,收集完之后,有可能做了多次溢写,                                还有可能收集到的所有数据不满足80M.                                那么,不管什么情况,最后都会调用flush方法。

    2、sortAndSpill()方法:

       说明:当满足kvbuffer里的阈值,或者是最后一次flush操作,都会调用 sortAndSpill() 操作    //提前统计这次溢写是多少个字节长度:包含kv原始数据和分区的头信息  final long size = distanceTo(bufstart, bufend, bufvoid) +partitions * APPROX_HEADER_LENGTH;

     //为这次溢写,创建一个溢写记录对象  final SpillRecord spillRec = new SpillRecord(partitions);    //为这次溢写,构造溢写文件路径,  final Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size);    //调用文件系统对象的create方法,创建溢写文件,返回流对象  out = rfs.create(filename);                                                                                                                                                                             //统计元数据在环形缓冲区的位置,并对其进行排序                 sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);            说明:默认使用QuickSort排序方式,                       一定会调用MapOutputBuffer.compare() 和 MapOutputBuffer.swap()        -->        //比较        public int compare(final int mi, final int mj) {           final int kvi = offsetFor(mi % maxRec);           final int kvj = offsetFor(mj % maxRec);           //取两个元数据的分区部分对应的索引值           final int kvip = kvmeta.get(kvi + PARTITION);           final int kvjp = kvmeta.get(kvj + PARTITION);           //如果分区不同,先按分区索引排序           if (kvip != kvjp) {                    return kvip - kvjp;           }           // 分区是同一个时,通过元数据取key的原始数据,进行比较           return comparator.compare(kvbuffer,           kvmeta.get(kvi + KEYSTART),           kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),           kvbuffer,           kvmeta.get(kvj + KEYSTART),           kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));         }         //交换          public void swap(final int mi, final int mj) {           //比较后,如果不符合排序规则,就进行交换。           //取两个元数据的偏移量           int iOff = (mi % maxRec) * METASIZE;           int jOff = (mj % maxRec) * METASIZE;           //进行交换           System.arraycopy(kvbuffer, iOff, META_BUFFER_TMP, 0, METASIZE);           System.arraycopy(kvbuffer, jOff, kvbuffer, iOff, METASIZE);           System.arraycopy(META_BUFFER_TMP, 0, kvbuffer, jOff, METASIZE);         }

    3、Spill溢写操作:

        //构建一个索引记录对象,目录是记录这次溢写的基本信息,如偏移量,长度等     final IndexRecord rec = new IndexRecord();     //构建了一个buffer,存储value的字节序列     final InMemValBytes value = new InMemValBytes();     //一个分区一个分区的进行遍历元数据     for (int i = 0; i < partitions; ++i) {         //获取开始溢写的偏移量         long segmentStart = out.getPos();         //加密或者不加密操作,重写构建流对象          FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);         //将partitionOut进行再次封装成writer流对象,进行溢写          writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter);                       //构建一个buffer,存储key的字节序列           DataInputBuffer key = new DataInputBuffer();           //遍历当前分区中的所有元数据            while (spindex < mend &&kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {                   //取出当前元数据                   final int kvoff = offsetFor(spindex % maxRec);                   //通过元数据取对应的原始数据kv对                   int keystart = kvmeta.get(kvoff + KEYSTART);                   int valstart = kvmeta.get(kvoff + VALSTART);                   //将key的值,写入buffer中                   key.reset(kvbuffer, keystart, valstart - keystart);                   //将value的值写buffer中。                   getVBytesForOffset(kvoff, value);                   writer.append(key, value);                   //为下一个元数据做准备工作                   ++spindex;             调用 writer.close(); 将key和value对应的两个buffer写到溢写文件中             统计这次溢写的索引信息,封装到溢写记录(spillRec)对象里

            【spillRec.putIndex(rec, i);】

            //条件:索引信息对应的缓存区:                                                                                                                                                                   如果这个缓存区小于真正的索引信息量,就写到文件中;                      如果缓存区大于信息量,就不进行写操作(写索引文件),将溢写对象里的信息写入一个索引文件中

            【spillRec.writeToFile(indexFilename, job);】                //为下一次溢写进行统计数字【这个数字与生成的溢写文件名称有关系】            ++numSpills;

    4、mergeParts():

    说明:当最后一次溢写操作完成【一定是调用了MapOutputBuffer.flush方法】后。            开始合并溢写文件。              //统计所有溢写文件的字节总长度       long finalOutFileSize = 0;

          //统计所有索引文件的字节总长度       long finalIndexFileSize = 0;

          //创建一个numSpills长度的Path数组,存储所有溢写文件的路径       final Path[] filename = new Path[numSpills];              //使用for循环 获取所有溢写文件的路径存储到数组Path[]中,同时,累加所有溢写文件的字节长度       for(int i = 0; i < numSpills; i++) {

          ----当溢写文件的个数==1时,sameVolRename(filename[0],mapOutputFile.getOutputFileForWriteInVolume(filename[0]));            将溢写文件名称重命名,当成最后一个文件。

          ----当溢写文件个数不为1时:            获取所有的索引文件,加载到indexCacheList(索引缓存列表)           在finalOutFileSize属性上再次添加partitions * APPROX_HEADER_LENGTH           在finalIndexFileSize 赋值为 partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;

              //构建最终的输出文件路径和最终的索引文件路径            Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize);            Path finalIndexFile =mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);                 //调用文件系统的create方法创建最终的文件,             FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);                                                                                                                                                                                                                    ----当溢写文件为0时:              也创建一个空文件,但没有做任何写操作。

           ----当溢写文件>1时:            sortPhase.addPhases(partitions); // Divide sort phase into sub-phases            IndexRecord rec = new IndexRecord();            final SpillRecord spillRec = new SpillRecord(partitions);            //开始遍历多个文件的相同分区             for (int parts = 0; parts < partitions; parts++) {                // 在当此循环中,创建要合并的段(segment)                List<Segment<K,V>> segmentList = new ArrayList<Segment<K, V>>(numSpills);               //然后遍历当前分区的所有溢写文件,               for(int i = 0; i < numSpills; i++) {                     //获取当前分区的索引记录                     IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);                     //将一个溢写文件的当前分区封装成Segment对象                     Segment<K,V> s =new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,indexRecord.partLength,                                                                          codec, true);                     //添加到集合中                     segmentList.add(i, s);                                //下面两行行代码,判断是否需要再次排序 ,,仅当存在中间合并时,才对段进行排序                     int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);                     boolean sortSegments = segmentList.size() > mergeFactor;

                        //再次调用下面方法,进行二次排序                    Merger.merge(job, rfs,keyClass, valClass, codec,segmentList, mergeFactor,                                                                                                          new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, sortSegments, null,                                                              spilledRecordsCounter, sortPhase.phase(), TaskType.MAP);                                              //将合并后的输出写入磁盘(往最终的文件里写)                      long segmentStart = finalOut.getPos();                      FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);                      Writer<K, V> writer = new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,                                                                                       spilledRecordsCounter);                      if (combinerRunner == null || numSpills < minSpillsForCombine) {                            Merger.writeFile(kvIter, writer, reporter, job);                       } else {                            combineCollector.setWriter(writer);                            combinerRunner.combine(kvIter, combineCollector);                       }

                         //关闭流,结束此分区的写入磁盘操作,进行下一个分区的写入操作                       writer.close();                      //所有分区结束写出操作后,关闭流                       finalOut.close();                      //删除所有的溢写文件                      for(int i = 0; i < numSpills; i++) {                           rfs.delete(filename[i],true);                      }


    练习:1、使用MR程序读取三个文件中的数据,进行排序(利用shuffle阶段的排序) file1.txt   1 2 5 23 34 12 56 23 14

    file2.txt  23 1 3 4 5 34 12 12 15

    file3.txt 12 13 42 54 23 23 12

    ---------------------------------------------------------------------------------------------------------------------------------------------------------

    2、以下是qq的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的) A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:A,O J:B,O K:A,C,D L:D,E,F M:E,F,G O:A,H,I,J 求出哪些人两两之间有共同好友,及他俩的共同好友都有谁? 比如:A-B: C,E

    3、去重 -------------------------------- file1.txt

    1001    5; 1003    10; 1004    20 1005    15

    file2.txt

    1007    4; 1003    10; 1004    20 1008    15

    最新回复(0)