SpringBoot集成Hadoop系列二 ---- MapReduce明星微博统计

    xiaoxiao2022-07-13  208

    代码:

    package com.hadoop.reduce.model; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 统计明星微博数据实体 * @author linhaiy * @date 2019.05.18 */ public class Weibo implements WritableComparable<Weibo> { // 微博粉丝数 private int friends; // 微博关注 private int followers; // 发微博数目 private int num; public Weibo() { } public Weibo(int friends, int followers, int num) { this.friends = friends; this.followers = followers; this.num = num; } public void set(int friends, int followers, int num) { this.friends = friends; this.followers = followers; this.num = num; } @Override public int compareTo(Weibo weibo) { return weibo.getFriends() - this.friends; } @Override public void write(DataOutput output) throws IOException { output.writeInt(friends); output.writeInt(followers); output.writeInt(num); } @Override public void readFields(DataInput input) throws IOException { friends = input.readInt(); followers = input.readInt(); num = input.readInt(); } public int getFriends() { return friends; } public void setFriends(int friends) { this.friends = friends; } public int getFollowers() { return followers; } public void setFollowers(int followers) { this.followers = followers; } public int getNum() { return num; } public void setNum(int num) { this.num = num; } } package com.hadoop.reduce.mapper; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import com.hadoop.reduce.model.Weibo; import java.io.IOException; /** * 统计明星微博的粉丝,关注,微博数 * @author linhaiy * @date 2019.05.18 */ public class WeiboMapper extends Mapper<Text, Weibo, Text, Text> { /** * 读取 /java/weibo/weibo.txt 文件,内容格式如下 唐嫣 唐嫣 24301532 200 2391 明星名称 名称微博名 粉丝数 * 关注 微博数 * @param key * @param value * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(Text key, Weibo value, Context context) throws IOException, InterruptedException { // 输出格式 key = friends values = // [{"friends":22898071,"followers":11,"num":268}...] context.write(new Text("friends"), new Text(key.toString() + "\t" + value.getFriends())); context.write(new Text("followers"), new Text(key.toString() + "\t" + value.getFollowers())); context.write(new Text("num"), new Text(key.toString() + "\t" + value.getNum())); } } package com.hadoop.reduce.reducer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import com.hadoop.util.SortUtil; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * 明星微博统计 * @author linhaiy * @date 2019.05.18 */ public class WeiboReduce extends Reducer<Text, Text, Text, IntWritable> { private MultipleOutputs<Text, IntWritable> outputs; @Override protected void setup(Context context) throws IOException, InterruptedException { outputs = new MultipleOutputs<>(context); } private Text text = new Text(); /** * 读取 WeiboMapper的输出,内容格式 key=friends, value= 姚晨 627 ... * @param key * @param values * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 将输出内容放到map中 Map<String, Integer> map = new HashMap<>(); for (Text value : values) { String[] spilt = value.toString().split("\t"); // 将数据放到map中 map.put(spilt[0], Integer.parseInt(spilt[1].toString())); } // 对map内容排序 Map.Entry<String, Integer>[] entries = SortUtil.sortHashMapByValue(map); // map排序后格式 [{"陈坤":73343207},{"姚晨":71382446}...] for (Map.Entry<String, Integer> entry : entries) { // 份文件输出,格式: friends 陈坤 73343207 outputs.write(key.toString(), entry.getKey(), entry.getValue()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { outputs.close(); } } package com.hadoop.reduce.service; import java.io.IOException; import javax.annotation.PostConstruct; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.hadoop.reduce.bean.StaffProvincePartitioner; import com.hadoop.reduce.bean.WeiboInputFormat; import com.hadoop.reduce.mapper.CounterMapper; import com.hadoop.reduce.mapper.FriendsMapper; import com.hadoop.reduce.mapper.JoinMapper; import com.hadoop.reduce.mapper.StaffMap; import com.hadoop.reduce.mapper.WeatherMap; import com.hadoop.reduce.mapper.WeiboMapper; import com.hadoop.reduce.mapper.WordCount; import com.hadoop.reduce.mapper.WordCountMap; import com.hadoop.reduce.model.GroupSortModel; import com.hadoop.reduce.model.OrderInfo; import com.hadoop.reduce.model.StaffModel; import com.hadoop.reduce.model.Weibo; import com.hadoop.reduce.reducer.FriendsReduce; import com.hadoop.reduce.reducer.JoinReduce; import com.hadoop.reduce.reducer.StaffReduce; import com.hadoop.reduce.reducer.WeatherReduce; import com.hadoop.reduce.reducer.WeiboReduce; import com.hadoop.reduce.reducer.WordCountReduce; import com.hadoop.util.GroupSort; /** * Map/Reduce工具类 * @author linhaiy * @date 2019.05.18 */ @Component public class ReduceJobsUtils { @Value("${hdfs.path}") private String path; private static String hdfsPath; /** * 获取HDFS配置信息 * @return */ public static Configuration getConfiguration() { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", hdfsPath); configuration.set("mapred.job.tracker", hdfsPath); // 运行在yarn的集群模式 // configuration.set("mapreduce.framework.name", "yarn"); // 这个配置是让main方法寻找该机器的mr环境 // configuration.set("yarn.resourcemanmager.hostname", "node1"); return configuration; } /** * 明星微博统计 * * @param jobName * @param inputPath * @param outputPath * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void weibo(String jobName, String inputPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = getConfiguration(); Job job = Job.getInstance(conf, jobName); job.setJarByClass(Weibo.class); // 指定Mapper的类 job.setMapperClass(WeiboMapper.class); // 指定reduce的类 job.setReducerClass(WeiboReduce.class); // 设置Mapper的输出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 设置Mapper输出的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 指定输入文件的位置 FileInputFormat.addInputPath(job, new Path(inputPath)); // 指定输入文件的位置 FileOutputFormat.setOutputPath(job, new Path(outputPath)); /** * 自定义输入输出格式 */ job.setInputFormatClass(WeiboInputFormat.class); MultipleOutputs.addNamedOutput(job, "friends", org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(job, "followers", org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(job, "num", org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class, Text.class, IntWritable.class); // 将job中的参数,提交到yarn中运行 job.waitForCompletion(true); } @PostConstruct public void getPath() { hdfsPath = this.path; } public static String getHdfsPath() { return hdfsPath; } } package com.hadoop.reduce.service; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.springframework.stereotype.Service; import com.hadoop.hdfs.service.HdfsService; /** * 单词统计 * @author linhaiy * @date 2019.05.18 */ @Service public class MapReduceService { // 默认reduce输出目录 private static final String OUTPUT_PATH = "/output"; /** * 明星微博统计 * @param jobName * @param inputPath * @throws Exception */ public void weibo(String jobName, String inputPath) throws Exception { if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) { return; } // 输出目录 = output/当前Job String outputPath = OUTPUT_PATH + "/" + jobName; if (HdfsService.existFile(outputPath)) { HdfsService.deleteFile(outputPath); } ReduceJobsUtils.weibo(jobName, inputPath, outputPath); } } package com.hadoop.reduce.controller; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import com.hadoop.reduce.service.MapReduceService; import com.hadoop.util.Result; /** * MapReduce处理控制层 * @author linhaiy * @date 2019.05.18 */ @RestController @RequestMapping("/hadoop/reduce") public class MapReduceAction { @Autowired MapReduceService mapReduceService; /** * 明星微博统计 * @param jobName * @param inputPath * @return * @throws Exception */ @RequestMapping(value = "weibo", method = RequestMethod.POST) @ResponseBody public Result weibo(@RequestParam("jobName") String jobName, @RequestParam("inputPath") String inputPath) throws Exception { if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) { return new Result(Result.FAILURE, "请求参数为空"); } mapReduceService.weibo(jobName, inputPath); return new Result(Result.SUCCESS, "明星微博统计成功"); } }

     

    最新回复(0)