SpringBoot集成Hadoop系列二 ---- MapReduce一年最高气温统计

    xiaoxiao2022-07-13  154

    代码:

    package com.hadoop.reduce.mapper; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; /** * 读取一年中某天的最高气温 * @author linhaiy * @date 2019.05.18 */ public class WeatherMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> { private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { // 打印输入样本 如 2018120715 System.out.println("==== Before Mapper: ====" + key + "," + value); String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); // 截取年份 String year = line.substring(0, 4); // 截取温度 int temperature = Integer.parseInt(line.substring(8)); word.set(year); output.collect(word, new LongWritable(temperature)); // 打印输出样本 System.out.println("==== After Mapper: ==== " + new Text(year) + "," + new LongWritable(temperature)); } } package com.hadoop.reduce.reducer; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; /** * 统计一年天气最高温 * @author linhaiy * @date 2019.05.18 */ public class WeatherReduce extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> { @Override public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { long maxValue = Integer.MIN_VALUE; StringBuffer sb = new StringBuffer(); // 取values温度的最大值 while (values.hasNext()) { long tmp = values.next().get(); maxValue = Math.max(maxValue, tmp); sb.append(tmp).append(", "); output.collect(key, new LongWritable(maxValue)); } // 打印输入样本,如 2000, 15 ,99, 12 System.out.println("==== Before Reduce ==== " + key + ", " + sb.toString()); // 打印输出样本 System.out.println("==== After Reduce ==== " + key + ", " + sb.toString()); } } package com.hadoop.reduce.service; import java.io.IOException; import javax.annotation.PostConstruct; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.hadoop.reduce.bean.StaffProvincePartitioner; import com.hadoop.reduce.bean.WeiboInputFormat; import com.hadoop.reduce.mapper.CounterMapper; import com.hadoop.reduce.mapper.FriendsMapper; import com.hadoop.reduce.mapper.JoinMapper; import com.hadoop.reduce.mapper.StaffMap; import com.hadoop.reduce.mapper.WeatherMap; import com.hadoop.reduce.mapper.WeiboMapper; import com.hadoop.reduce.mapper.WordCount; import com.hadoop.reduce.mapper.WordCountMap; import com.hadoop.reduce.model.GroupSortModel; import com.hadoop.reduce.model.OrderInfo; import com.hadoop.reduce.model.StaffModel; import com.hadoop.reduce.model.Weibo; import com.hadoop.reduce.reducer.FriendsReduce; import com.hadoop.reduce.reducer.JoinReduce; import com.hadoop.reduce.reducer.StaffReduce; import com.hadoop.reduce.reducer.WeatherReduce; import com.hadoop.reduce.reducer.WeiboReduce; import com.hadoop.reduce.reducer.WordCountReduce; import com.hadoop.util.GroupSort; /** * Map/Reduce工具类 * * @author linhaiy * @date 2019.05.18 */ @Component public class ReduceJobsUtils { @Value("${hdfs.path}") private String path; private static String hdfsPath; /** * 获取HDFS配置信息 * @return */ public static Configuration getConfiguration() { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", hdfsPath); configuration.set("mapred.job.tracker", hdfsPath); // 运行在yarn的集群模式 // configuration.set("mapreduce.framework.name", "yarn"); // 这个配置是让main方法寻找该机器的mr环境 // configuration.set("yarn.resourcemanmager.hostname", "node1"); return configuration; } /** * 获取单词一年最高气温计算配置 * @param jobName * @return */ public static JobConf getWeatherJobsConf(String jobName) { JobConf jobConf = new JobConf(getConfiguration()); jobConf.setJobName(jobName); jobConf.setOutputKeyClass(Text.class); jobConf.setOutputValueClass(LongWritable.class); jobConf.setMapperClass(WeatherMap.class); jobConf.setReducerClass(WeatherReduce.class); jobConf.setInputFormat(TextInputFormat.class); jobConf.setOutputFormat(TextOutputFormat.class); return jobConf; } @PostConstruct public void getPath() { hdfsPath = this.path; } public static String getHdfsPath() { return hdfsPath; } } package com.hadoop.reduce.service; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.springframework.stereotype.Service; import com.hadoop.hdfs.service.HdfsService; /** * 单词统计 * @author linhaiy * @date 2019.05.18 */ @Service public class MapReduceService { // 默认reduce输出目录 private static final String OUTPUT_PATH = "/output"; /** * 一年最高气温统计 * @param jobName * @param inputPath * @throws Exception */ public void weather(String jobName, String inputPath) throws Exception { if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) { return; } // 输出目录 = output/当前Job String outputPath = OUTPUT_PATH + "/" + jobName; if (HdfsService.existFile(outputPath)) { HdfsService.deleteFile(outputPath); } JobConf jobConf = ReduceJobsUtils.getWeatherJobsConf(jobName); FileInputFormat.setInputPaths(jobConf, new Path(inputPath)); FileOutputFormat.setOutputPath(jobConf, new Path(outputPath)); JobClient.runJob(jobConf); } } package com.hadoop.reduce.controller; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import com.hadoop.reduce.service.MapReduceService; import com.hadoop.util.Result; /** * MapReduce处理控制层 * @author linhaiy * @date 2019.05.18 */ @RestController @RequestMapping("/hadoop/reduce") public class MapReduceAction { @Autowired MapReduceService mapReduceService; /** * 一年最高气温统计 * @param jobName * @param inputPath * @return * @throws Exception */ @RequestMapping(value = "MaxWeather", method = RequestMethod.POST) @ResponseBody public Result weather(@RequestParam("jobName") String jobName, @RequestParam("inputPath") String inputPath) throws Exception { if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) { return new Result(Result.FAILURE, "请求参数为空"); } mapReduceService.weather(jobName, inputPath); return new Result(Result.SUCCESS, "温度统计成功"); } }

    部分截图:

    最新回复(0)