SpringBoot集成Hadoop系列二 ---- MapReduce词频统计

    xiaoxiao2022-07-13  169

            继上篇SpringBoot集成Hadoop系列一 ---- 对HDFS的文件操作建的工程,接下来使用MapReduce进行一些数据文件的统计开发.这里做一个很经典的统计功能,词频统计.

      代码:

    package com.hadoop.reduce.mapper; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.wltea.analyzer.core.IKSegmenter; import org.wltea.analyzer.core.Lexeme; /** * 统计单个字符出现的次数 * @author linhaiy * @date 2019.05.18 */ public class WordCountMap extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); /** * 读取 sgyy.txt或者dpcq.txt 内容格式为小说内容 * @param key * 默认情况下,是mapreduce所读取到的一行文本的起始偏移量 * @param value * 默认情况下,是mapreduce所读取到的一行文本的内容,hadoop中的序列化类型为Text * @param context * 是用户自定义逻辑处理完成后输出的KEY,在此处是单词,String * @throws IOException * @throws InterruptedException */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 防止中文乱码 String line = new String(value.getBytes(), 0, value.getLength(), "UTF-8").trim(); if (StringUtils.isNotEmpty(line)) { // 使用分词器,分隔文件行内容根据常用的短语分隔,比如我们,被分隔成 <我,1>,<们,1><我们,1> byte[] btValue = line.getBytes(); InputStream inputStream = new ByteArrayInputStream(btValue); Reader reader = new InputStreamReader(inputStream); IKSegmenter ikSegmenter = new IKSegmenter(reader, true); Lexeme lexeme; while ((lexeme = ikSegmenter.next()) != null) { word.set(lexeme.getLexemeText()); context.write(word, one); } } } } package com.hadoop.reduce.reducer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * 统计单个字符出现的次数 * @author linhaiy * @date 2019.05.18 */ public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); private String text = "孙权"; private int textSum = 0; private List<String> textList = null; public WordCountReduce() { textList = new ArrayList<>(); textList.add("曹操"); textList.add("孙权"); } /** * @param key * 第一个Text: 是传入的单词名称,是Mapper中传入的 * @param values * 第二个:LongWritable 是该单词出现了多少次,这个是mapreduce计算出来的,比如 hello出现了11次 * @param context * 第三个Text: 是输出单词的名称 ,这里是要输出到文本中的内容 * @throws IOException * @throws InterruptedException */ @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); String keyStr = key.toString(); // 未使用分词器,需要根据map传过来的行内容检索并累加 // boolean isHas = keyStr.contains(text); // if (isHas) { // textSum++; // System.out.println("============ " + text + " 统计分词为: " + textSum + " // ============"); // } // 使用分词器,内容已经被统计好了,直接输出即可 if (textList.contains(keyStr)) { System.out.println("============ " + keyStr + " 统计分词为: " + sum + " ============"); } } } package com.hadoop.reduce.service; import java.io.IOException; import javax.annotation.PostConstruct; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.hadoop.reduce.bean.StaffProvincePartitioner; import com.hadoop.reduce.bean.WeiboInputFormat; import com.hadoop.reduce.mapper.CounterMapper; import com.hadoop.reduce.mapper.FriendsMapper; import com.hadoop.reduce.mapper.JoinMapper; import com.hadoop.reduce.mapper.StaffMap; import com.hadoop.reduce.mapper.WeatherMap; import com.hadoop.reduce.mapper.WeiboMapper; import com.hadoop.reduce.mapper.WordCount; import com.hadoop.reduce.mapper.WordCountMap; import com.hadoop.reduce.model.GroupSortModel; import com.hadoop.reduce.model.OrderInfo; import com.hadoop.reduce.model.StaffModel; import com.hadoop.reduce.model.Weibo; import com.hadoop.reduce.reducer.FriendsReduce; import com.hadoop.reduce.reducer.JoinReduce; import com.hadoop.reduce.reducer.StaffReduce; import com.hadoop.reduce.reducer.WeatherReduce; import com.hadoop.reduce.reducer.WeiboReduce; import com.hadoop.reduce.reducer.WordCountReduce; import com.hadoop.util.GroupSort; /** * Map/Reduce工具类 * @author linhaiy * @date 2019.05.18 */ @Component public class ReduceJobsUtils { @Value("${hdfs.path}") private String path; private static String hdfsPath; /** * 获取HDFS配置信息 * * @return */ public static Configuration getConfiguration() { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", hdfsPath); configuration.set("mapred.job.tracker", hdfsPath); // 运行在yarn的集群模式 // configuration.set("mapreduce.framework.name", "yarn"); // 这个配置是让main方法寻找该机器的mr环境 // configuration.set("yarn.resourcemanmager.hostname", "node1"); return configuration; } /** * 获取单词统计的配置信息 * * @param jobName * @param inputPath * @param outputPath * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void getWordCountJobsConf(String jobName, String inputPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = getConfiguration(); Job job = Job.getInstance(conf, jobName); job.setMapperClass(WordCountMap.class); job.setCombinerClass(WordCountReduce.class); job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 小文件合并设置 job.setInputFormatClass(CombineTextInputFormat.class); // 最大分片 CombineTextInputFormat.setMaxInputSplitSize(job, 4 * 1024 * 1024); // 最小分片 CombineTextInputFormat.setMinInputSplitSize(job, 2 * 1024 * 1024); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.waitForCompletion(true); } @PostConstruct public void getPath() { hdfsPath = this.path; } public static String getHdfsPath() { return hdfsPath; } } package com.hadoop.reduce.service; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.springframework.stereotype.Service; import com.hadoop.hdfs.service.HdfsService; /** * 单词统计 * @author linhaiy * @date 2019.05.18 */ @Service public class MapReduceService { // 默认reduce输出目录 private static final String OUTPUT_PATH = "/output"; /** * 单词统计,统计某个单词出现的次数 * @param jobName * @param inputPath * @throws Exception */ public void wordCount(String jobName, String inputPath) throws Exception { if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) { return; } // 输出目录 = output/当前Job,如果输出路径存在则删除,保证每次都是最新的 String outputPath = OUTPUT_PATH + "/" + jobName; if (HdfsService.existFile(outputPath)) { HdfsService.deleteFile(outputPath); } ReduceJobsUtils.getWordCountJobsConf(jobName, inputPath, outputPath); } } package com.hadoop.reduce.controller; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import com.hadoop.reduce.service.MapReduceService; import com.hadoop.util.Result; /** * MapReduce处理控制层 * @author linhaiy * @date 2019.05.18 */ @RestController @RequestMapping("/hadoop/reduce") public class MapReduceAction { @Autowired MapReduceService mapReduceService; /** * 单词统计(统计指定key单词的出现次数) * @param jobName * @param inputPath * @return * @throws Exception */ @RequestMapping(value = "wordCount", method = RequestMethod.POST) @ResponseBody public Result wordCount(@RequestParam("jobName") String jobName, @RequestParam("inputPath") String inputPath) throws Exception { if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) { return new Result(Result.FAILURE, "请求参数为空"); } mapReduceService.wordCount(jobName, inputPath); return new Result(Result.SUCCESS, "单词统计成功"); } }

     

    最新回复(0)