数据处理的编程模型 – 本质:并行运行(协调性、可靠性)
任务过程?map阶段和reduce阶段:以"键值对"作为输入输出
MapReduce逻辑数据流 横向扩展 工作单元(Job):输入数据、MapReduce程序、配置信息。combiner函数:减少Map与Reduce数据传输 max(0,20,10,25,15)= max(max(0,20,10),max(25,15)) = max(20,25) = 25 多个map、reduce任务流5.实例
Maper package hadoop; // cc MaxTemperatureMapper Mapper for maximum temperature example // vv MaxTemperatureMapper import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; //继承Mapper类 public class MaxTemperatureMapper // 输入键、输入值、输出键、输出值 extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; //重写map方法() @Override // 输入键、输入值、输出内容写入(key,value) public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } } // ^^ MaxTemperatureMapper Reduce package hadoop; // cc MaxTemperatureReducer Reducer for maximum temperature example // vv MaxTemperatureReducer import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; // 继承Reduce类 public class MaxTemperatureReducer // 输入键、输入值、输出键、输出值 extends Reducer<Text, IntWritable, Text, IntWritable> { //重写reduce方法 @Override // 输入键、输入值、输出内容写入(key,value) public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } } // ^^ MaxTemperatureReducer Job package hadoop; import org.apache.hadoop.conf.Configuration; // cc MaxTemperature Application to find the maximum temperature in the weather dataset // vv MaxTemperature import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MaxTemperature { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } // 获取服务配置 Configuration conf = new Configuration(); // 指定作业执行规范 Job job = new Job(conf); job.setJarByClass(MaxTemperature.class); job.setJobName("Max temperature"); // 指定输入和输出数据路径 FileInputFormat.addInputPath(job, new Path("hdfs://192.168.137.133:8020/tmp/sample.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.137.133:8020/tmp/result.txt")); // 指定Mapper和Reduce类型 job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setCombinerClass(MaxTemperatureReducer.class); // 设置Mapper函数的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置Reduce函数的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 输入输出格式化 // job.setInputFormatClass(TextInputFormat.class); // job.setOutputFormatClass(TextOutputFormat.class); // 设置提交作业并等待执行完成 System.exit(job.waitForCompletion(true) ? 0 : 1); } } // ^^ MaxTemperature相关配置: 基于 Eclipse 的 MapReduce 开发环境搭建
