MapReduce的自定义组件案例总结

    xiaoxiao2022-07-12  147

    MapReduce的自定义组件案例总结

    需要的依赖:

    (其中需要用到两个maven打包插件,便于将项目打成jar包发布到hadoop集群上运行)

    <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-mr1-cdh5.14.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.14.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.14.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0-cdh5.14.0</version> </dependency> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> <!-- <verbal>true</verbal>--> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizeJar>true</minimizeJar> </configuration> </execution> </executions> </plugin>

    一.自定义分区案例

    需求将文件中的第六列的15以上的结果以及15以下的结果进行分开成两个文件进行保存
    思路:其中k2,v2和k3,v3均分别为Text和NullWritable,自定义partitioner将k2中的值的第六列取出并根据这一列以15为分界线将不同的数据分到不同的两个分区
    代码如下:
    Main方法:
    public class MyPartitionerMain extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { Job job = Job.getInstance(super.getConf(), "MyPartitioner"); job.setJarByClass(MyPartitionerMain.class); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/test/input")); job.setMapperClass(MyPartitionerMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setPartitionerClass(MyPatitioner.class); job.setNumReduceTasks(2); job.setReducerClass(MyPartionerReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/test/output01")); boolean b = job.waitForCompletion(true); return b ? 0 : 1; } public static void main(String[] args) throws Exception { int run = ToolRunner.run(new Configuration(), new MyPartitionerMain(), args); System.exit(run); } }

    注意使用自定义分区的时候必须在集群上运行,故要设定打成jar包,且必须设定reduceTask的个数

    自定义mapper方法
    public class MyPartitionerMapper extends Mapper<LongWritable, Text,Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value,NullWritable.get()); } }

    其中自定义mapper和reducer不需要改变

    自定义reducer方法
    public class MyPartionerReducer extends Reducer<Text, NullWritable,Text,NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); } }
    自定义partitioner方法
    public class MyPatitioner extends Partitioner<Text, NullWritable> { @Override public int getPartition(Text text, NullWritable nullWritable, int i) { String s = text.toString().split("\t")[5]; if(Long.parseLong(s)>15){ return 1; }else{ return 0; } } }

    二.自定义排序案例

    对于一个数据文件,要求第一列按照字典顺序进行排列,第一列相同的时候,第二列按照升序进行排列:

    数据格式样例如下: a 1 a 7 b 8 b 10 a 5

    思路:自定义数据类型实现WritableComparable接口,重写比较器和序列化反序列化方法,并将此数据类型作为key2,从而完成自定义排序

    自定义数据类型代码如下:

    @Data public class MySortWritable implements WritableComparable<MySortWritable> { private String first; private Long second; @Override public int compareTo(MySortWritable o) { if(this.first.compareTo(o.first)==0){ return this.second.compareTo(o.second); }else{ return this.first.compareTo(o.first); } } //重写序列化方法 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.first); dataOutput.writeLong(this.second); } //重写反序列化方法 @Override public void readFields(DataInput dataInput) throws IOException { this.first = dataInput.readUTF(); this.second = dataInput.readLong(); } }
    自定义mapper:
    public class MySortMapper extends Mapper<LongWritable, Text, MySortWritable, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { MySortWritable mySortWritable = new MySortWritable(); mySortWritable.setFirst(value.toString().split("\t")[0]); mySortWritable.setSecond(Long.parseLong(value.toString().split("\t")[1])); context.write(mySortWritable, value); } }
    自定义reducer:
    public class MySortReducer extends Reducer<MySortWritable, Text, Text, NullWritable> { @Override protected void reduce(MySortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(value, NullWritable.get()); } } }
    最新回复(0)