对于MR程序一般会extends和implements如下
public class XXX extends Configured implements Tool {实现接口Tool的run方法,就是job类
public interface Tool extends Configurable { int run(String[] var1) throws Exception; }首先是main方法
public static void main(String[] args) { try{ //1. 获取配置信息 Configuration conf = new Configuration(); //2. 获取命令行参数,封装成String[] String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //3. 运行run方法,运行结果response int response = ToolRunner.run(conf, new 本类名(), otherArgs); if(ret == 0){ System.out.println("Job is successfully completed... "); }else{ System.out.println("Job failed... "); } System.exit(0); }catch(Exception exception){ exception.printStackTrace(); } }整个run方法中,有一些固定语法总结如下:
传入run方法中的参数args[ ]就是提交MR程序的脚本中全类名后面的附加参数,
hadoop bin jar XX.jar XX全类名 参数1(数据输入路径),参数2(数据输出路径),参数3,参数4...
@Override public int run(String[] args) throws Exception { //1. 设定运行MR程序jar包的输入参数 String inputPath = args[0]; //输入Dir String outputPath = args[1]; //输出Dir String c = args[2]; String d = args[3]; //2. 创建资源环境 Configuration conf = getConf(); //对MR结果文件去掉 _success 文件 conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs","false"); //3. 初始化创建job,使用conf资源环境 Job job = initJob(conf); //4.1 获取源文件路径List<Path>类型(方式1) List<Path> paths = HadoopUtil.getFilePathUnderDir(inputDir, conf); //4.2 获取源文件路径List<Path>类型(方式2) //(1)获取文件系统 FileSystem fs = FileSystem.get(conf); //(2)输入路径List List<Path> pathList = new ArrayList<Path>(); //(3)输入路径filter FileNameFilter clusterFileFilter = new FileNameFilter(conf); //(4)获取源文件路径List<Path> if(fs.exists(new Path(inputDir))){ pathList.addAll(HadoopUtil.getFilePathUnderDir( inputDir, conf, clusterFileFilter)); } //5. 设置MR的输入和输出路径(关联job和Path) FileInputFormat.addInputPath(job,new Path(inputPath)); FileOutputFormat.setOutputPath(job,new Path(outputDir)); //6. 设置各种在mapper和reducer中调用的属性参数 job.getConfiguration().set(key,value); //7. 将job提交到集群运行,并等待作业运行完毕 // ret为true表示将job运行进度信息及时输出给用户,false则等待作业结束 boolean ret = job.waitForCompletion(true); return ret ? 0 : 1; }封装构建job的方法initJob(conf)
private Job initJob(Configuration conf) throws IOException{ //1. 创建名为本类的job,使用资源环境conf,jobname为参数2 Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJobName(this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); //2. 设置job中读取HDFS文件的类,此处是逐行读取,用TextInputFormat.class job.setInputFormatClass(TextInputFormat.class); //3. 设置job的Mapper类和Mapper端输出的Key-Value类 job.setMapperClass(BitMapCalculationMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //4. 设置Combiner预聚合类 job.setCombinerClass(BitMapCalculationCombiner.class); //5. 设置job的Reducer类和Reducer端输出的Key-Value类 job.setReducerClass(BitMapCalculationReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return job; }
