MapReduce

    xiaoxiao2023-10-04  156

    **

    MapReduce

    ** 思想核心:分为治之,Map负责“分”,Reduce负责“合" MapReduce是一个分布式运算程序的编程框架。

    8个步骤

    Map阶段 第一步:从hdfs读取文件,通过inputformat设定读取路径 第二步:maptask要做的具体的业务逻辑。 Shuffle阶段 第三步:对maptask的输出进行分区 第四步:对分区的数据进行排序 第五步:对排序后的数据进行规约combine(可选) 第六步:对规约处理后的数据进行分组 reduce阶段 第七步:获取map阶段处理后的数据进行业务的编写 第八步:将编写后的最终结果进行输出

    简单的WordCount案例

    需求:计算给出的每个单词出现次数 mapper类 publci class Mapper extends Mapper<LongWritable,Text,Text,LongWritable>{ @Override public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{ String line =value.toString(); String[] split =line.split(","); for(String word:split){ contect.write(new Text(word),new LongWritable(1)); } } }

    reduce类 public class Reducer extends Reducer<Text,LongWritabble,Text,LongWritable>{ @Override protected void reduce(Text key,Iterable values,Conetxt context) throws IOException,InterruptedException{ long count=0; for(LongWritable value:values){ count + =value.get(); } context.write(key,new LongWritable(count)); } }

    主类 public class JobMain extends Configured implements Tool{ @Override public int run(String[] args)throws Exception{ Job job=Job.getInstance(super.getConf(),JobMain.class.getSimpleName()); job.setJarByClass(JobMain.class); job.setInputFormatClass(TextInputFormat.cass); TextInputFormat.addInputPath(job,new Path(“hdfs://node01:8020/word”));

    job.setMapperClass(Mapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);

    job.setReducerClass(Reducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class);

    job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path(“hdfs://node01:8020/word1”)); boolean b=job.waitForCompleton(true); return b?0:1; }

    public static void main(String[] args)throws Exception{ Configuration configuration =new Configuration(); Tool tool=new JobMain(); int run =ToolRunner.run(configuration,tool,args); System.exit(run); } }

    运行模式 本地运行模式代码设置 configuration.set(“mapreduce.framework.name”,“local”); configuration.set(“yarn.resourcemanager.hostname”,“local”); TextInputFormat.addInputPath(job,new Path(“file:///”)); TextOutputFormat.setOutputPath(job,new Path(“file:///”));

    集群运行模式 提交集群上用hadoop命令启动 yarn jar hadoop_hdfs_operate-1.0-SNAPSHOT.jar demo.JobMain

    分区的案例,只能达成jar包发布到集群上面去运行。

    combiner阶段 作用就是对Map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能,是MapReduce的一种优化手段之一。

    combiner组件的父类就是Reducer combiner是在每一个maptask所在的节点运行,reducer是接收全局所有mapper的输出结果。

    shuffle阶段是分布在mapreduce的Map阶段和reduce阶段。

    collect阶段:将maptask的结果输出到环形缓冲区,spill阶段:当内存中的数据量达到一定的阀值时,会将数据写入本地磁盘,还会进行一次排序。merge阶段:合并所有溢出的临时文件,确保一个maptask最终只产生一个中间数据文件。copy阶段:reducetask启动线程到已完成的maptask节点上复制一份数据保存在内存缓冲区,当数据量达到一定的阀值是,就会写入磁盘中。merge节点,在复制的同时,后台会启动两个线程对内存本地数据文件进行合并。sort阶段:在对数据进行合并的同时,会进行排序操作。
    最新回复(0)