Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架; Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。
在一堆给定的文本文件中统计输出每一个单词出现的总次数
wordcount 程序在集群执行过程示意图 单词统计map的编写,需要继承org.apache.hadoop.mapreduce.Mapper import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,Long * 但是hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable * VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String 用Text * KEYOUT:是用户自己定义逻辑处理完成之后输出数据中的Key,此处是单词,String 用Text * VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,此处是单词次数,Integer 用IntWritable * @author potter */ public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ /** * map阶段的业务逻辑就写在自定义的map()方法中 * maptask会对每一行输入数据调用一次我们自定义的map()方法 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将maptask传给我们的文本内容先转换成String String line = value.toString(); //根据空格将单词这一行切分成单词 String[] words = line.split(" "); //将单词输出为<单词,1> for(String word : words ){ //将单词作为key,将次数1作为value,以便后续的数据分发,可以根据单词分发,以便于相同的单词会到相同的reduce task中 context.write(new Text(word), new IntWritable(1)); } } } 单词统计reduce的编写 需继承org.apache.hadoop.mapreduce.Reducer import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * KEYIN,VALUEIN对应mapper输出的KEYOUT,VALUEOUT类型 * KEYOUT,VALUEOUT是自定义reduce逻辑处理结果输出数据类型 * KEYUOUT是单词 * VALUEOUT是总次数 * @author potter */ public class WordcountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ /**<apple,1><apple,1><apple,1><apple,1><apple,1><apple,1><apple,1><apple,1> * <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1><banana,1> * <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1><hello,1><hello,1> * 入参参数,key是一组相同单词KV对的key */ @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException { int count = 0 ; for(IntWritable value : values){ count += value.get(); } context.write(key, new IntWritable(count)); } } yarn客户端用于提交task import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 相当于yarn集群的客户端 * 需要在此封装我们的mr程序的相关运行参数,指定jar包 * 最后提交给yarn * @author potter * */ public class WordcountDriver { public static void main(String args[]) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WordcountDriver.class); //指定本业务job要使用的Mapper/Reduce业务类 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定job的输入的原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的输出结果所在目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行 boolean response = job.waitForCompletion(true); System.exit(response?0:1); } } 将该项目export->jar file 并命名为wordcount.jar 并放到hadoop集群任意一台服务器上通过 hadoop fs -mkdir -p /wordcount/input/ 在服务器上新建目录存放用于统计的文本文件 通过 hadoop fs -put [文件名] 命令 ,上传文本到新建的input目录下 通过 hadoop jar [wordcount jar包名称 ] [Driver主类(yarn客户端)路径名] [待统计文本地址] [输出文本地址] 运行 jar包 集群启动map和reduce执行任务 执行完成后 生成对应的统计信息文件 默认输出单词是有序的