MapReduce中的InputFormat(1)概述

    xiaoxiao2025-10-19  6

    1 概念 InputFormat用于描述输入数据的格式,提供以下两个功能: A、数据切分:按照某种策略将输入的数据切分成若干split,以便确定Map Task个数,以及对应的Split。 B、提供数据:为Mapper提供输入数据,对于给定split,能将其解析为<k,v>格式。即<K1,V1>

    2 新老版本

    老版本:package org.apache.hadoop.mapred

    public interface InputFormat<K, V> { InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; RecordReader<K, V> getRecordReader(InputSplit split,JobConf job, Reporter reporter) throws IOException; }新版本:package org.apache.hadoop.mapreduce

    public abstract class InputFormat<K, V> { public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException; } 3 解析 3.1 设计思想 所有基于文件的InputFormat的实现基类都是FileInputFormat。 针对文本格式:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat 针对二进制格式:SequenceFileInputFormat

    基于文件的FileInputFormat的设计思想是:

    A 由公共基类FileInputFormat采用统一的方法,对文件进行切分成InputSplit(如按照统一的大小)。getSplit方法 B 由各个派生类根据自己的需求,解析InputSplit。即各个子类实现的createRecordReader方法

    3.2 getSplits

    主要完成数据切分的功能,它会尝试着将输入数据切分为numSplit个inputSplit。有以下两个特点: A、逻辑分片:inputSplit只记录分片的元信息。 B、可序列化:为了进程间通信。 在Hadoop1.X在JobClient的中writeNewSplits方法使用了getSplits。

    // 通过反射获得设置的inputFormat.class的inputFormat对象 InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); // 获取逻辑分片信息 List<InputSplit> splits = input.getSplits(job); 3.3 getRecordReader 该方法返回一个RecordReader对象,它实现了类似迭代器的功能,将某个split解析为一个个<k,v>对。该类需要考虑以下两点: A、定位边界记录:为了识别一条完整的记录,记录之间要加上一些同步标志。 对于TextInputFormat:同步标识就是换行符。 对于SequenceFileInputFormat:每隔离若干条记录,会添加固定长度同步字符串。 B、解析<k,v>:定位到一条记录后,需要将该记录分解为key和value两部分。 对于TextInputFormat:key就是该行在文件的中的偏移量,value就是该行的内容。 对于SequenceFileInputFormat: 每条记录的格式为[record length] [key length] [key] [value]。 前两个字段分别是整条记录的长度和key的长度,均为4个字节,后半部分分别是key和value的内容。知道每条记录的格式后,很容易解析。 整理自董西成老师的《Hadoop技术内幕》,并阅读源码小有体会。

    最新回复(0)