SpringBoot集成Hadoop系列二 ---- MapReduce统计数据文件的共同好友

    xiaoxiao2022-07-13  186

    代码:

    package com.hadoop.reduce.mapper; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * 查找共同的好友 * @author linhaiy * @date 2019.05.18 */ public class FriendsMapper extends Mapper<LongWritable, Text, Text, Text> { private Text k = new Text(); private Text v = new Text(); /** * 读取 friends.txt 内容格式 A:B,C,D,F,E,O * @param key * @param value * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString().trim(); // 根据冒号拆分 String[] personFriends = line.split(":"); // 第一个为用户 String person = personFriends[0]; // 第二个为好友 String friends = personFriends[1]; // 好友根据逗号拆分 String[] friendsList = friends.split(","); for (String friend : friendsList) { k.set(friend); v.set(person); context.write(k, v); } } } package com.hadoop.reduce.reducer; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * 获取共同好友 * @author linhaiy * @date 2019.05.18 */ public class FriendsReduce extends Reducer<Text, Text, Text, Text> { private Text k = new Text(); private Text v = new Text(); /** * 读取 FriendsMapper1 输出,内容格式 B A * @param key * @param values * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); // 循环好友 for (Text person : values) { sb.append(person).append(","); } k.set(key); v.set(sb.toString()); context.write(k, v); } } package com.hadoop.reduce.service; import java.io.IOException; import javax.annotation.PostConstruct; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.hadoop.reduce.bean.StaffProvincePartitioner; import com.hadoop.reduce.bean.WeiboInputFormat; import com.hadoop.reduce.mapper.CounterMapper; import com.hadoop.reduce.mapper.FriendsMapper; import com.hadoop.reduce.mapper.JoinMapper; import com.hadoop.reduce.mapper.StaffMap; import com.hadoop.reduce.mapper.WeatherMap; import com.hadoop.reduce.mapper.WeiboMapper; import com.hadoop.reduce.mapper.WordCount; import com.hadoop.reduce.mapper.WordCountMap; import com.hadoop.reduce.model.GroupSortModel; import com.hadoop.reduce.model.OrderInfo; import com.hadoop.reduce.model.StaffModel; import com.hadoop.reduce.model.Weibo; import com.hadoop.reduce.reducer.FriendsReduce; import com.hadoop.reduce.reducer.JoinReduce; import com.hadoop.reduce.reducer.StaffReduce; import com.hadoop.reduce.reducer.WeatherReduce; import com.hadoop.reduce.reducer.WeiboReduce; import com.hadoop.reduce.reducer.WordCountReduce; import com.hadoop.util.GroupSort; /** * Map/Reduce工具类 * @author linhaiy * @date 2019.05.18 */ @Component public class ReduceJobsUtils { @Value("${hdfs.path}") private String path; private static String hdfsPath; /** * 获取HDFS配置信息 * @return */ public static Configuration getConfiguration() { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", hdfsPath); configuration.set("mapred.job.tracker", hdfsPath); // 运行在yarn的集群模式 // configuration.set("mapreduce.framework.name", "yarn"); // 这个配置是让main方法寻找该机器的mr环境 // configuration.set("yarn.resourcemanmager.hostname", "node1"); return configuration; } /** * 获取共同好友 * @param jobName * @param inputPath * @param outputPath * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void friends(String jobName, String inputPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException { Configuration config = getConfiguration(); Job job = Job.getInstance(config, jobName); // 设置jar中的启动类,可以根据这个类找到相应的jar包 job.setJarByClass(FriendsMapper.class); job.setMapperClass(FriendsMapper.class); job.setReducerClass(FriendsReduce.class); // 一般情况下mapper和reducer的输出的数据类型是一样的,所以我们用上面两条命令就行,如果不一样,我们就可以用下面两条命令单独指定mapper的输出key、value的数据类型 // 设置Mapper的输出 // job.setMapOutputKeyClass(Text.class); // job.setMapOutputValueClass(Text.class); // 设置reduce的输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 指定输入输出文件的位置 FileInputFormat.setInputPaths(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.waitForCompletion(true); } @PostConstruct public void getPath() { hdfsPath = this.path; } public static String getHdfsPath() { return hdfsPath; } } package com.hadoop.reduce.service; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.springframework.stereotype.Service; import com.hadoop.hdfs.service.HdfsService; /** * 单词统计 * @author linhaiy * @date 2019.05.18 */ @Service public class MapReduceService { // 默认reduce输出目录 private static final String OUTPUT_PATH = "/output"; /** * 获取共同好友 * @param jobName * @param inputPath * @throws Exception */ public void friends(String jobName, String inputPath) throws Exception { if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) { return; } // 输出目录 = output/当前Job String outputPath = OUTPUT_PATH + "/" + jobName; if (HdfsService.existFile(outputPath)) { HdfsService.deleteFile(outputPath); } ReduceJobsUtils.friends(jobName, inputPath, outputPath); } } package com.hadoop.reduce.controller; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import com.hadoop.reduce.service.MapReduceService; import com.hadoop.util.Result; /** * MapReduce处理控制层 * @author linhaiy * @date 2019.05.18 */ @RestController @RequestMapping("/hadoop/reduce") public class MapReduceAction { @Autowired MapReduceService mapReduceService; /** * 获取共同好友 * @param jobName * @param inputPath * @return * @throws Exception */ @RequestMapping(value = "friends", method = RequestMethod.POST) @ResponseBody public Result friends(@RequestParam("jobName") String jobName, @RequestParam("inputPath") String inputPath) throws Exception { if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) { return new Result(Result.FAILURE, "请求参数为空"); } mapReduceService.friends(jobName, inputPath); return new Result(Result.SUCCESS, "获取共同好友成功"); } }

     

    最新回复(0)