阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/product/odps
MaxCompute(原ODPS)有一套自己的MapReduce编程模型和接口,简单说来,这套接口的输入输出都是MaxCompute中的Table,处理的数据是以Record为组织形式的,它可以很好地描述Table中的数据处理过程,然而与社区的Hadoop相比,编程接口差异较大。Hadoop用户如果要将原来的Hadoop MR作业迁移到MaxCompute的MR执行,需要重写MR的代码,使用MaxCompute的接口进行编译和调试,运行正常后再打成一个Jar包才能放到MaxCompute的平台来运行。这个过程十分繁琐,需要耗费很多的开发和测试人力。如果能够完全不改或者少量地修改原来的Hadoop MR代码就能在MaxCompute平台上跑起来,将是一个比较理想的方式。
现在MaxCompute平台提供了一个HadoopMR到MaxCompute MR的适配工具,已经在一定程度上实现了Hadoop MR作业的二进制级别的兼容,即用户可以在不改代码的情况下通过指定一些配置,就能将原来在Hadoop上运行的MR jar包拿过来直接跑在MaxCompute上。该插件的下载地址在:http://repo.aliyun.com/download/hadoop2openmr-1.0.jar,目前该插件处于测试阶段,暂时还不能支持用户自定义comparator和自定义key类型,下面将以WordCount程序为例,介绍一下这个插件的基本使用方式。
使用该插件在MaxCompute平台跑一个HadoopMR作业的基本步骤如下:
通过http://repo.aliyun.com/download/hadoop2openmr-1.0.jar下载插件,包名为hadoop2openmr-1.0.jar,注意,这个jar里面已经包含hadoop-2.7.2版本的相关依赖,在作业的jar包中请不要携带hadoop的依赖,避免版本冲突。
编译导出WordCount的jar包:wordcount_test.jar ,wordcount程序的源码如下:
package com.aliyun.odps.mapred.example.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.StringTokenizer; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }待导入文本文件data.txt的数据内容如下:
hello maxcompute hello mapreduce例如可以通过如下命令将data.txt的数据导入wc_in中,
tunnel upload data.txt wc_in;配置文件命名为:wordcount-table-res.conf
{ "file:/foo": { "resolver": { "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver", "properties": { "text.resolver.columns.combine.enable": "true", "text.resolver.seperator": "\t" } }, "tableInfos": [ { "tblName": "wc_in", "partSpec": {}, "label": "__default__" } ], "matchMode": "exact" }, "file:/bar": { "resolver": { "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver", "properties": { "binary.resolver.input.key.class" : "org.apache.hadoop.io.Text", "binary.resolver.input.value.class" : "org.apache.hadoop.io.LongWritable" } }, "tableInfos": [ { "tblName": "wc_out", "partSpec": {}, "label": "__default__" } ], "matchMode": "fuzzy" } }配置项说明:
整个配置是一个json文件,描述hdfs上文件与maxcompute上表之间的映射关系,一般要配置输入和输出两部分,一个HDFS路径对应一个resolver配置,tableInfos配置以及matchMode配置。
resolver: 用于配置如何对待文件中的数据,目前有com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver和com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver两个内置的resolver可以选用。除了指定好resolver的名字,还需要为相应的resolver配置一些properties指导它正确的进行数据解析。
TextFileResolver: 对于纯文本的数据,输入输出都会当成纯文本对待。当作为输入resolver配置时,需要配置的properties有text.resolver.columns.combine.enable和text.resolver.seperator,当text.resolver.columns.combine.enable配置为true时,会把输入表的所有列按找text.resolver.seperator指定的分隔符组合成一个字符串作为输入。否则,会把输入表的前两列分别作为key,value。BinaryFileResolver: 可以处理二进制的数据,自动将数据转换为maxcompute可以支持的数据类型,如bigint, bool, double等。当作为输出resolver配置时,需要配置的properties有binary.resolver.input.key.class和binary.resolver.input.value.class,分别代表中间结果的key和value类型。tableInfos:用户配置HDFS对应的maxcompute的表,目前只支持配置表的名字tblName,而partSpec和label请保持和示例一致。matchMode:路径的匹配模式,可选项为exact和fuzzy,分别代表精确匹配和模糊匹配,如果设置为fuzzy,则可以通过正则来匹配HDFS的输入路径maxcompute命令行工具的安装和配置方法参考:http://repo.aliyun.com/odpscmd/
在maxcompute的命令行下运行如下命令:
jar -DODPS_HADOOPMR_TABLE_RES_CONF=./wordcount-table-res.conf -classpath hadoop2openmr-1.0.jar,wordcount_test.jar com.aliyun.odps.mapred.example.hadoop.WordCount /foo /bar;这里假设我们已经将hadoop2openmr-1.0.jar和wordcount_test.jar以及wordcount-table-res.conf已经放置到odpscmd的当前目录,否则在指定配置和-classpath的路径时需要做相应的修改。
运行过程如下图所示:
当作业运行完成后,可以查看结果表wc_out里面的内容,验证作业成功结束,结果符合预期。
欢迎加入MaxCompute钉钉群讨论
