SpringBoot集成Hadoop系列二 ---- MapReduce数据的分组统计,排序

    xiaoxiao2022-07-14  160

    代码:

    package com.hadoop.reduce.model; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 分组排序model类 * @author linhaiy * @date 2019.05.18 */ public class GroupSortModel implements WritableComparable<GroupSortModel> { private int name; private int num; public GroupSortModel() { } public GroupSortModel(int name, int num) { this.name = name; this.num = num; } public void set(int name, int num) { this.name = name; this.num = num; } @Override public int compareTo(GroupSortModel groupSortModel) { if (this.name != groupSortModel.getName()) { return this.name < groupSortModel.getName() ? -1 : 1; } else if (this.num != groupSortModel.getNum()) { return this.num < groupSortModel.getNum() ? -1 : 1; } else { return 0; } } @Override public void write(DataOutput output) throws IOException { output.writeInt(this.name); output.writeInt(this.num); } @Override public void readFields(DataInput input) throws IOException { this.name = input.readInt(); this.num = input.readInt(); } @Override public String toString() { return name + "\t" + num; } @Override public int hashCode() { return this.name * 157 + this.num; } public int getName() { return name; } public void setName(int name) { this.name = name; } public int getNum() { return num; } public void setNum(int num) { this.num = num; } } package com.hadoop.util; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.hadoop.reduce.model.GroupSortModel; import java.io.IOException; /** * 分组统计,并对value排序 * @author linhaiy * @date 2019.05.18 */ public class GroupSort extends Configured implements Tool { /** * 分组统计排序mapper类 读取 /java/groupSort.txt 文件,内容格式 40 20 30 20 */ public static class GroupSortMapper extends Mapper<LongWritable, Text, GroupSortModel, IntWritable> { private static final GroupSortModel groupSortModel = new GroupSortModel(); private static final IntWritable num = new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split("\t"); if (split != null && split.length >= 2) { groupSortModel.set(Integer.parseInt(split[0]), Integer.parseInt(split[1])); num.set(Integer.parseInt(split[1])); // {"name":40,"num":20} 20 // System.out.println("mapper输出:" + // JsonUtil.toJSON(groupSortModel) + " " + num); context.write(groupSortModel, num); } } } /** * 分区过滤 */ public static class GroupSortPartitioner extends Partitioner<GroupSortModel, IntWritable> { @Override public int getPartition(GroupSortModel key, IntWritable value, int numPartitions) { return Math.abs(key.getName() * 127) % numPartitions; } } /** * 统计 */ public static class GroupSortComparator extends WritableComparator { public GroupSortComparator() { super(GroupSortModel.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { GroupSortModel model = (GroupSortModel) a; int num = model.getNum(); GroupSortModel model2 = (GroupSortModel) b; int num2 = model2.getNum(); // comparator输出:20 1 // System.out.println("comparator输出:" + model.getName() + " " + // model.getNum()); // comparator2输出:20 10 // System.out.println("comparator2输出:" + model2.getName() + " " + // model2.getNum()); return num == num2 ? 0 : (num < num2 ? -1 : 1); } } /** * 分组统计 */ public static class GroupSortReduce extends Reducer<GroupSortModel, IntWritable, Text, IntWritable> { private static final Text name = new Text(); @Override protected void reduce(GroupSortModel key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { name.set(key + ""); for (IntWritable value : values) { // reduce输出:20 1 1 System.out.println("reduce输出:" + key + " " + value); context.write(name, value); } } } @Override public int run(String[] args) throws Exception { // 读取配置文件 Configuration conf = new Configuration(); // 如果目标文件存在则删除 Path outPath = new Path(args[1]); FileSystem fs = outPath.getFileSystem(conf); if (fs.exists(outPath)) { boolean flag = fs.delete(outPath, true); } // 新建一个Job Job job = Job.getInstance(conf, "groupSort"); // 设置jar信息 job.setJarByClass(GroupSort.class); // 设置mapper信息 job.setMapperClass(GroupSort.GroupSortMapper.class); // 设置reduce信息 job.setReducerClass(GroupSort.GroupSortReduce.class); // 设置mapper和reduce的输出格式,如果相同则只需设置一个 job.setOutputKeyClass(GroupSortModel.class); job.setOutputValueClass(NullWritable.class); // 设置fs文件地址 FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 运行 return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { String[] filePath = { "hdfs://127.0.0.1:9000/java/groupSort.txt", "hdfs://127.0.0.1:9000/output/groupSort" }; int ec = ToolRunner.run(new Configuration(), new SearchStar(), filePath); System.exit(ec); } } package com.hadoop.reduce.service; import java.io.IOException; import javax.annotation.PostConstruct; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.hadoop.reduce.bean.StaffProvincePartitioner; import com.hadoop.reduce.bean.WeiboInputFormat; import com.hadoop.reduce.mapper.CounterMapper; import com.hadoop.reduce.mapper.FriendsMapper; import com.hadoop.reduce.mapper.JoinMapper; import com.hadoop.reduce.mapper.StaffMap; import com.hadoop.reduce.mapper.WeatherMap; import com.hadoop.reduce.mapper.WeiboMapper; import com.hadoop.reduce.mapper.WordCount; import com.hadoop.reduce.mapper.WordCountMap; import com.hadoop.reduce.model.GroupSortModel; import com.hadoop.reduce.model.OrderInfo; import com.hadoop.reduce.model.StaffModel; import com.hadoop.reduce.model.Weibo; import com.hadoop.reduce.reducer.FriendsReduce; import com.hadoop.reduce.reducer.JoinReduce; import com.hadoop.reduce.reducer.StaffReduce; import com.hadoop.reduce.reducer.WeatherReduce; import com.hadoop.reduce.reducer.WeiboReduce; import com.hadoop.reduce.reducer.WordCountReduce; import com.hadoop.util.GroupSort; /** * Map/Reduce工具类 * @author linhaiy * @date 2019.05.18 */ @Component public class ReduceJobsUtils { @Value("${hdfs.path}") private String path; private static String hdfsPath; /** * 获取HDFS配置信息 * * @return */ public static Configuration getConfiguration() { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", hdfsPath); configuration.set("mapred.job.tracker", hdfsPath); // 运行在yarn的集群模式 // configuration.set("mapreduce.framework.name", "yarn"); // 这个配置是让main方法寻找该机器的mr环境 // configuration.set("yarn.resourcemanmager.hostname", "node1"); return configuration; } /** * 分组统计、排序 * @param jobName * @param inputPath * @param outputPath * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void groupSort(String jobName, String inputPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = getConfiguration(); Job job = Job.getInstance(conf, jobName); job.setJarByClass(GroupSort.class); // 设置reduce文件拆分个数 // job.setNumReduceTasks(3); // 设置mapper信息 job.setMapperClass(GroupSort.GroupSortMapper.class); job.setPartitionerClass(GroupSort.GroupSortPartitioner.class); job.setGroupingComparatorClass(GroupSort.GroupSortComparator.class); // 设置reduce信息 job.setReducerClass(GroupSort.GroupSortReduce.class); // 设置Mapper的输出 job.setMapOutputKeyClass(GroupSortModel.class); job.setMapOutputValueClass(IntWritable.class); // 设置mapper和reduce的输出格式,如果相同则只需设置一个 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 指定输入文件的位置 FileInputFormat.addInputPath(job, new Path(inputPath)); // 指定输入文件的位置 FileOutputFormat.setOutputPath(job, new Path(outputPath)); // 运行 job.waitForCompletion(true); } @PostConstruct public void getPath() { hdfsPath = this.path; } public static String getHdfsPath() { return hdfsPath; } } package com.hadoop.reduce.service; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.springframework.stereotype.Service; import com.hadoop.hdfs.service.HdfsService; /** * 单词统计 * @author linhaiy * @date 2019.05.18 */ @Service public class MapReduceService { // 默认reduce输出目录 private static final String OUTPUT_PATH = "/output"; /** * mapreduce 分组统计、排序 * @param jobName * @param inputPath * @throws Exception */ public void groupSort(String jobName, String inputPath) throws Exception { if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) { return; } // 输出目录 = output/当前Job String outputPath = OUTPUT_PATH + "/" + jobName; if (HdfsService.existFile(outputPath)) { HdfsService.deleteFile(outputPath); } ReduceJobsUtils.groupSort(jobName, inputPath, outputPath); } } package com.hadoop.reduce.controller; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import com.hadoop.reduce.service.MapReduceService; import com.hadoop.util.Result; /** * MapReduce处理控制层 * @author linhaiy * @date 2019.05.18 */ @RestController @RequestMapping("/hadoop/reduce") public class MapReduceAction { @Autowired MapReduceService mapReduceService; /** * 分组统计、排序 * @param jobName * @param inputPath * @return * @throws Exception */ @RequestMapping(value = "groupSort", method = RequestMethod.POST) @ResponseBody public Result groupSort(@RequestParam("jobName") String jobName, @RequestParam("inputPath") String inputPath) throws Exception { if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) { return new Result(Result.FAILURE, "请求参数为空"); } mapReduceService.groupSort(jobName, inputPath); return new Result(Result.SUCCESS, "分组统计、排序成功"); } }

     

    最新回复(0)