以年份温度的mr程序为示例,进行分析: 【MyMapper.class】context.write(yearT, temperature);//将数据写出 -->【WrappedMapper.class】write(KEYOUT key, VALUEOUT value) -->【TaskInputOutputContextImpl.class】write(KEYOUT key, VALUEOUT value) -->【NewOutputCollector.class】write(K key, V value) 说明:构建NewOutputCollector对象时,调用构造器 //给属性MapOutputCollector赋值MapOutputBuffer对象 collector = createSortingCollector(job, reporter); //获取分区数 partitions = jobContext.getNumReduceTasks(); 如果分区数>1: 分区器,通过反射机制获取。参考如下: ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job) 【conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class)】 获取默认的HashPartitioner分区器 用户自定义的话,会从job身上获取自定义的分区器
如果分区数是1:(默认情况) 不会采用MR提供的默认的分区器(如hashPartitioner), 而是使用匿名内部类 创建一个分区器,返回索引0.
如果分区数是0: 不走shuffle,涉及不到分区情况。
-->【MapOutputBuffer.class】collect(K key, V value, final int partition) 说明:收集器调用collect方法,将kv对存储在环形缓冲区中 在收集map输出数据时,收集完之后,有可能做了多次溢写, 还有可能收集到的所有数据不满足80M. 那么,不管什么情况,最后都会调用flush方法。
说明:当满足kvbuffer里的阈值,或者是最后一次flush操作,都会调用 sortAndSpill() 操作 //提前统计这次溢写是多少个字节长度:包含kv原始数据和分区的头信息 final long size = distanceTo(bufstart, bufend, bufvoid) +partitions * APPROX_HEADER_LENGTH;
//为这次溢写,创建一个溢写记录对象 final SpillRecord spillRec = new SpillRecord(partitions); //为这次溢写,构造溢写文件路径, final Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size); //调用文件系统对象的create方法,创建溢写文件,返回流对象 out = rfs.create(filename); //统计元数据在环形缓冲区的位置,并对其进行排序 sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); 说明:默认使用QuickSort排序方式, 一定会调用MapOutputBuffer.compare() 和 MapOutputBuffer.swap() --> //比较 public int compare(final int mi, final int mj) { final int kvi = offsetFor(mi % maxRec); final int kvj = offsetFor(mj % maxRec); //取两个元数据的分区部分对应的索引值 final int kvip = kvmeta.get(kvi + PARTITION); final int kvjp = kvmeta.get(kvj + PARTITION); //如果分区不同,先按分区索引排序 if (kvip != kvjp) { return kvip - kvjp; } // 分区是同一个时,通过元数据取key的原始数据,进行比较 return comparator.compare(kvbuffer, kvmeta.get(kvi + KEYSTART), kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART), kvbuffer, kvmeta.get(kvj + KEYSTART), kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART)); } //交换 public void swap(final int mi, final int mj) { //比较后,如果不符合排序规则,就进行交换。 //取两个元数据的偏移量 int iOff = (mi % maxRec) * METASIZE; int jOff = (mj % maxRec) * METASIZE; //进行交换 System.arraycopy(kvbuffer, iOff, META_BUFFER_TMP, 0, METASIZE); System.arraycopy(kvbuffer, jOff, kvbuffer, iOff, METASIZE); System.arraycopy(META_BUFFER_TMP, 0, kvbuffer, jOff, METASIZE); }
//构建一个索引记录对象,目录是记录这次溢写的基本信息,如偏移量,长度等 final IndexRecord rec = new IndexRecord(); //构建了一个buffer,存储value的字节序列 final InMemValBytes value = new InMemValBytes(); //一个分区一个分区的进行遍历元数据 for (int i = 0; i < partitions; ++i) { //获取开始溢写的偏移量 long segmentStart = out.getPos(); //加密或者不加密操作,重写构建流对象 FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out); //将partitionOut进行再次封装成writer流对象,进行溢写 writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter); //构建一个buffer,存储key的字节序列 DataInputBuffer key = new DataInputBuffer(); //遍历当前分区中的所有元数据 while (spindex < mend &&kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { //取出当前元数据 final int kvoff = offsetFor(spindex % maxRec); //通过元数据取对应的原始数据kv对 int keystart = kvmeta.get(kvoff + KEYSTART); int valstart = kvmeta.get(kvoff + VALSTART); //将key的值,写入buffer中 key.reset(kvbuffer, keystart, valstart - keystart); //将value的值写buffer中。 getVBytesForOffset(kvoff, value); writer.append(key, value); //为下一个元数据做准备工作 ++spindex; 调用 writer.close(); 将key和value对应的两个buffer写到溢写文件中 统计这次溢写的索引信息,封装到溢写记录(spillRec)对象里
【spillRec.putIndex(rec, i);】
//条件:索引信息对应的缓存区: 如果这个缓存区小于真正的索引信息量,就写到文件中; 如果缓存区大于信息量,就不进行写操作(写索引文件),将溢写对象里的信息写入一个索引文件中
【spillRec.writeToFile(indexFilename, job);】 //为下一次溢写进行统计数字【这个数字与生成的溢写文件名称有关系】 ++numSpills;
说明:当最后一次溢写操作完成【一定是调用了MapOutputBuffer.flush方法】后。 开始合并溢写文件。 //统计所有溢写文件的字节总长度 long finalOutFileSize = 0;
//统计所有索引文件的字节总长度 long finalIndexFileSize = 0;
//创建一个numSpills长度的Path数组,存储所有溢写文件的路径 final Path[] filename = new Path[numSpills]; //使用for循环 获取所有溢写文件的路径存储到数组Path[]中,同时,累加所有溢写文件的字节长度 for(int i = 0; i < numSpills; i++) {
----当溢写文件的个数==1时,sameVolRename(filename[0],mapOutputFile.getOutputFileForWriteInVolume(filename[0])); 将溢写文件名称重命名,当成最后一个文件。
----当溢写文件个数不为1时: 获取所有的索引文件,加载到indexCacheList(索引缓存列表) 在finalOutFileSize属性上再次添加partitions * APPROX_HEADER_LENGTH 在finalIndexFileSize 赋值为 partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
//构建最终的输出文件路径和最终的索引文件路径 Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize); Path finalIndexFile =mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize); //调用文件系统的create方法创建最终的文件, FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); ----当溢写文件为0时: 也创建一个空文件,但没有做任何写操作。
----当溢写文件>1时: sortPhase.addPhases(partitions); // Divide sort phase into sub-phases IndexRecord rec = new IndexRecord(); final SpillRecord spillRec = new SpillRecord(partitions); //开始遍历多个文件的相同分区 for (int parts = 0; parts < partitions; parts++) { // 在当此循环中,创建要合并的段(segment) List<Segment<K,V>> segmentList = new ArrayList<Segment<K, V>>(numSpills); //然后遍历当前分区的所有溢写文件, for(int i = 0; i < numSpills; i++) { //获取当前分区的索引记录 IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); //将一个溢写文件的当前分区封装成Segment对象 Segment<K,V> s =new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,indexRecord.partLength, codec, true); //添加到集合中 segmentList.add(i, s); //下面两行行代码,判断是否需要再次排序 ,,仅当存在中间合并时,才对段进行排序 int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100); boolean sortSegments = segmentList.size() > mergeFactor;
//再次调用下面方法,进行二次排序 Merger.merge(job, rfs,keyClass, valClass, codec,segmentList, mergeFactor, new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, sortSegments, null, spilledRecordsCounter, sortPhase.phase(), TaskType.MAP); //将合并后的输出写入磁盘(往最终的文件里写) long segmentStart = finalOut.getPos(); FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut); Writer<K, V> writer = new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null || numSpills < minSpillsForCombine) { Merger.writeFile(kvIter, writer, reporter, job); } else { combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector); }
//关闭流,结束此分区的写入磁盘操作,进行下一个分区的写入操作 writer.close(); //所有分区结束写出操作后,关闭流 finalOut.close(); //删除所有的溢写文件 for(int i = 0; i < numSpills; i++) { rfs.delete(filename[i],true); }
练习:1、使用MR程序读取三个文件中的数据,进行排序(利用shuffle阶段的排序) file1.txt 1 2 5 23 34 12 56 23 14
file2.txt 23 1 3 4 5 34 12 12 15
file3.txt 12 13 42 54 23 23 12
---------------------------------------------------------------------------------------------------------------------------------------------------------
2、以下是qq的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的) A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:A,O J:B,O K:A,C,D L:D,E,F M:E,F,G O:A,H,I,J 求出哪些人两两之间有共同好友,及他俩的共同好友都有谁? 比如:A-B: C,E
3、去重 -------------------------------- file1.txt
1001 5; 1003 10; 1004 20 1005 15
file2.txt
1007 4; 1003 10; 1004 20 1008 15