MapReduce的自定义组件案例总结
需要的依赖:
(其中需要用到两个maven打包插件,便于将项目打成jar包发布到hadoop集群上运行)
<dependencies>
<dependency>
<groupId>org
.apache
.hadoop
</groupId
>
<artifactId>hadoop
-client
</artifactId
>
<version>2.6.0-mr1
-cdh5
.14.0</version
>
</dependency
>
<dependency>
<groupId>org
.apache
.hadoop
</groupId
>
<artifactId>hadoop
-common
</artifactId
>
<version>2.6.0-cdh5
.14.0</version
>
</dependency
>
<dependency>
<groupId>org
.apache
.hadoop
</groupId
>
<artifactId>hadoop
-hdfs
</artifactId
>
<version>2.6.0-cdh5
.14.0</version
>
</dependency
>
<dependency>
<groupId>org
.apache
.hadoop
</groupId
>
<artifactId>hadoop
-mapreduce
-client
-core
</artifactId
>
<version>2.6.0-cdh5
.14.0</version
>
</dependency
>
<!-- https
://mvnrepository
.com
/artifact
/junit
/junit
-->
<dependency>
<groupId>junit
</groupId
>
<artifactId>junit
</artifactId
>
<version>4.11</version
>
<scope>test
</scope
>
</dependency
>
<dependency>
<groupId>org
.testng
</groupId
>
<artifactId>testng
</artifactId
>
<version>RELEASE
</version
>
</dependency
>
<dependency>
<groupId>junit
</groupId
>
<artifactId>junit
</artifactId
>
<version>4.12</version
>
<scope>test
</scope
>
</dependency
>
</dependencies
>
<build>
<plugins>
<plugin>
<groupId>org
.apache
.maven
.plugins
</groupId
>
<artifactId>maven
-compiler
-plugin
</artifactId
>
<version>3.0</version
>
<configuration>
<source>1.8</source
>
<target>1.8</target
>
<encoding>UTF
-8</encoding
>
<!-- <verbal>true</verbal
>-->
</configuration
>
</plugin
>
<plugin>
<groupId>org
.apache
.maven
.plugins
</groupId
>
<artifactId>maven
-shade
-plugin
</artifactId
>
<version>2.4.3</version
>
<executions>
<execution>
<phase>package</phase
>
<goals>
<goal>shade
</goal
>
</goals
>
<configuration>
<minimizeJar>true</minimizeJar
>
</configuration
>
</execution
>
</executions
>
</plugin
>
一.自定义分区案例
需求将文件中的第六列的15以上的结果以及15以下的结果进行分开成两个文件进行保存
思路:其中k2,v2和k3,v3均分别为Text和NullWritable,自定义partitioner将k2中的值的第六列取出并根据这一列以15为分界线将不同的数据分到不同的两个分区
代码如下:
Main方法:
public class MyPartitionerMain extends Configured implements Tool {
@Override
public int run(String
[] strings
) throws Exception
{
Job job
= Job
.getInstance(super.getConf(), "MyPartitioner");
job
.setJarByClass(MyPartitionerMain
.class);
job
.setInputFormatClass(TextInputFormat
.class);
TextInputFormat
.addInputPath(job
, new Path("hdfs://node01:8020/test/input"));
job
.setMapperClass(MyPartitionerMapper
.class);
job
.setMapOutputKeyClass(Text
.class);
job
.setMapOutputValueClass(NullWritable
.class);
job
.setPartitionerClass(MyPatitioner
.class);
job
.setNumReduceTasks(2);
job
.setReducerClass(MyPartionerReducer
.class);
job
.setOutputKeyClass(Text
.class);
job
.setOutputValueClass(NullWritable
.class);
job
.setOutputFormatClass(TextOutputFormat
.class);
TextOutputFormat
.setOutputPath(job
, new Path("hdfs://node01:8020/test/output01"));
boolean b
= job
.waitForCompletion(true);
return b
? 0 : 1;
}
public static void main(String
[] args
) throws Exception
{
int run
= ToolRunner
.run(new Configuration(), new MyPartitionerMain(), args
);
System
.exit(run
);
}
}
注意使用自定义分区的时候必须在集群上运行,故要设定打成jar包,且必须设定reduceTask的个数
自定义mapper方法
public class MyPartitionerMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
@Override
protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{
context
.write(value
,NullWritable
.get());
}
}
其中自定义mapper和reducer不需要改变
自定义reducer方法
public class MyPartionerReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
@Override
protected void reduce(Text key
, Iterable
<NullWritable> values
, Context context
) throws IOException
, InterruptedException
{
context
.write(key
,NullWritable
.get());
}
}
自定义partitioner方法
public class MyPatitioner extends Partitioner<Text, NullWritable> {
@Override
public int getPartition(Text text
, NullWritable nullWritable
, int i
) {
String s
= text
.toString().split("\t")[5];
if(Long
.parseLong(s
)>15){
return 1;
}else{
return 0;
}
}
}
二.自定义排序案例
对于一个数据文件,要求第一列按照字典顺序进行排列,第一列相同的时候,第二列按照升序进行排列:
数据格式样例如下: a 1 a 7 b 8 b 10 a 5
思路:自定义数据类型实现WritableComparable接口,重写比较器和序列化反序列化方法,并将此数据类型作为key2,从而完成自定义排序
自定义数据类型代码如下:
@Data
public class MySortWritable implements WritableComparable<MySortWritable> {
private String first
;
private Long second
;
@Override
public int compareTo(MySortWritable o
) {
if(this.first
.compareTo(o
.first
)==0){
return this.second
.compareTo(o
.second
);
}else{
return this.first
.compareTo(o
.first
);
}
}
@Override
public void write(DataOutput dataOutput
) throws IOException
{
dataOutput
.writeUTF(this.first
);
dataOutput
.writeLong(this.second
);
}
@Override
public void readFields(DataInput dataInput
) throws IOException
{
this.first
= dataInput
.readUTF();
this.second
= dataInput
.readLong();
}
}
自定义mapper:
public class MySortMapper extends Mapper<LongWritable, Text, MySortWritable, Text> {
@Override
protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{
MySortWritable mySortWritable
= new MySortWritable();
mySortWritable
.setFirst(value
.toString().split("\t")[0]);
mySortWritable
.setSecond(Long
.parseLong(value
.toString().split("\t")[1]));
context
.write(mySortWritable
, value
);
}
}
自定义reducer:
public class MySortReducer extends Reducer<MySortWritable, Text, Text, NullWritable> {
@Override
protected void reduce(MySortWritable key
, Iterable
<Text> values
, Context context
) throws IOException
, InterruptedException
{
for (Text value
: values
) {
context
.write(value
, NullWritable
.get());
}
}
}