MapReduce的演练

    xiaoxiao2022-07-06  216

    一、 WordCountDriver

    import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.BasicConfigurator; /** * Created by abc . * * 这个类就是mr程序运行时候的主类,本类中组装了一些程序运行时候所需要的信息 * 比如:使用的是那个Mapper类 那个Reducer类 输入数据在那 输出数据在什么地方 */ public class WordCountDriver { public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { BasicConfigurator.configure(); System.setProperty("HADOOP_USER_NAME", "root"); Configuration conf = new Configuration(); // 即使没有下面这行,也可以本地运行 因\hadoop-mapreduce-client-core-2.7.4.jar!\mapred-default.xml 中默认的参数就是 local //conf.set("mapreduce.framework.name","local"); Job job = Job.getInstance(conf); //指定本次mr job jar包运行主类 job.setJarByClass(WordCountDriver.class); //指定本次mr 所用的mapper reducer类分别是什么 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //指定本次mr mapper阶段的输出 k v类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定本次mr 最终输出的 k v类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // job.setNumReduceTasks(3); //ReduceTask个数 //如果业务有需求,就可以设置combiner组件 job.setCombinerClass(WordCountReducer.class); //指定本次mr 输入的数据路径 和最终输出结果存放在什么位置 FileInputFormat.setInputPaths(job,"c:\\in\\aa.txt"); FileOutputFormat.setOutputPath(job,new Path("c:\\out")); //如果出现0644错误或找不到winutils.exe,则需要设置windows环境和相关文件. //上面的路径是本地测试时使用,如果要打包jar到hdfs上运行时,需要使用下面的路径。 //FileInputFormat.setInputPaths(job,"/wordcount/input"); //FileOutputFormat.setOutputPath(job,new Path("/wordcount/output")); // job.submit(); //一般不要这个. //提交程序 并且监控打印程序执行情况 boolean rs = job.waitForCompletion(true); System.exit(rs?0:1); } }

    二、WordCountMapper

    import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.BasicConfigurator; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ //对数据进行打散 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { BasicConfigurator.configure(); //输入数据 hello world love work String line = value.toString(); //对数据切分 String[] words=line.split(" "); //写出<hello, 1> for(String w:words) { //写出reducer端 context.write(new Text(w), new IntWritable(1)); }}}

    三、WordCountReducer

    import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.log4j.BasicConfigurator; public class WordCountReducer extends Reducer <Text, IntWritable, Text, IntWritable>{ protected void reduce(Text Key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { BasicConfigurator.configure(); //记录出现的次数 int sum=0; //累加求和输出 for(IntWritable v:values) { sum +=v.get(); } context.write(Key, new IntWritable(sum)); } }
    最新回复(0)