《Hadoop实战手册》一1.8 从MongoDB导入数据到HDFS

    xiaoxiao2024-01-07  170

    本节书摘来异步社区《Hadoop实战手册》一书中的第1章,第1.8节,作者: 【美】Jonathan R. Owens , Jon Lentz , Brian Femiano 译者: 傅杰 , 赵磊 , 卢学裕 责编: 杨海玲,更多章节内容可以访问云栖社区“异步社区”公众号查看。

    1.8 从MongoDB导入数据到HDFS

    本节将使用MongoInputFormat类加载MongoDB中的数据导入HDFS中。

    准备工作使用Mongo Hadoop适配器最简单的方法是从GitHub上克隆Mongo-Hadoop工程,并且将该工程编译到一个特定的Hadoop版本。克隆该工程需要安装一个Git客户端。

    本节假定你使用的Hadoop版本是CDH3。

    Git客户端官方的下载地址是:http://git-scm.com/downloads。

    在Windows操作系统上可以通过http://windows.github.com/访问GitHub。

    在Mac操作系统上可以通过http://mac.github.com/访问GitHub。

    可以通过https://github.com/mongodb/mongo-hadoop获取到Mongo Hadoop适配器。该工程需要编译在特定的Hadoop版本上。编译完的JAR文件需要复制到Hadoop集群每个节点的$HADOOP_HOME/lib目录下。

    Mongo Java驱动包也需要安装到Hadoop集群每个节点的$HADOOP_HOME/lib目录下。该驱动包可从https://github.com/mongodb/mongo-java-driver/downloads下载。

    操作步骤完成下面步骤实现将MongoDB中的数据复制到HDFS中。

    1.通过下面的命令实现克隆mongo-hadoop工程:

    git clone https://github.com/mongodb/mongo-hadoop.git

    2.切换到稳定发布的1.0分支版本:

    git checkout release-1.0

    3.必须保持mongo-hadoop与Hadoop的版本一致。使用文本编辑器打开mongo-hadoop克隆目录下的build.sbt文件,修改下面这行:

    hadoopRelease in ThisBuild := "default"

    修改为:

    hadoopRelease in ThisBuild := "cdh3"

    4.编译mongo-hadoop:

    ./sbt package.

    这将会在core/target文件夹下生成一个名为mongo-hadoop-core_cdh3u3-1.0.0.jar的JAR文件。

    5.从https://github.com/mongodb/mongo-java-driver/downloads下载MongoDB 2.8.0版本的Java驱动包。

    6.复制mongo-hadoop和MongoDB Java驱动包到Hadoop集群每个节点的$HADOOP_HOME/lib:

    cp mongo-hadoop-core_cdh3u3-1.0.0.jar mongo-2.8.0.jar $HADOOP_HOME/lib

    7.编写MapReduce读取MongoDB数据库中的数据并写入HDFS中:

    import java.io.*; import org.apache.commons.logging.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.mapreduce.*; import org.bson.*; import com.mongodb.hadoop.*; import com.mongodb.hadoop.util.*; public class ImportWeblogsFromMongo { private static final Log log = LogFactory. getLog(ImportWeblogsFrom Mongo.class); public static class ReadWeblogsFromMongo extends Mapper<Object, BSONObject, Text, Text>{ public void map(Object key, BSONObject value, Context context) throws IOException, InterruptedException{ System.out.println("Key: " + key); System.out.println("Value: " + value); String md5 = value.get("md5").toString(); String url = value.get("url").toString(); String date = value.get("date").toString(); String time = value.get("time").toString(); String ip = value.get("ip").toString(); String output = "\t" + url + "\t" + date + "\t" + time + "\t" + ip; context.write( new Text(md5), new Text(output)); } } public static void main(String[] args) throws Exception{ final Configuration conf = new Configuration(); MongoConfigUtil.setInputURI(conf, "mongodb://<HOST>:<PORT>/test.weblogs"); MongoConfigUtil.setCreateInputSplits(conf, false); System.out.println("Configuration: " + conf); final Job job = new Job(conf, "Mongo Import"); Path out = new Path("/data/weblogs/mongo_import"); FileOutputFormat.setOutputPath(job, out); job.setJarByClass(ImportWeblogsFromMongo.class); job.setMapperClass(ReadWeblogsFromMongo.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(MongoInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setNumReduceTasks(0); System.exit(job.waitForCompletion(true) ? 0 : 1 ); } }

    这个只有map的作业用到了Mongo Hadoop适配器提供的几个类。从HDFS读入的数据会被转换成一个BSONObject对象。该类描述的是一个二进制的JSON值。MongoDB使用这些BSONObject对象来有效地序列化、传输和存储数据。

    Mongo Hadoop适配器还提供了一个方便的工具类MongoConfigUtil,使得可以把MongoDB当成是一个文件系统来访问。

    8.导出为一个可运行的JAR文件,并运行该作业:

    hadoop jar ImportWeblogsFromMongo.jar

    9.验证weblogs数据是否已经导入HDFS中:

    hadoop fs -ls /data/weblogs/mongo_import

    工作原理Mongo Hadoop适配器提供了一种新的兼容Hadoop的文件系统实现,包括MongoInputFormat和MongoOutputFormat。这些抽象实现使得访问MongoDB和访问任何兼容Hadoop的文件系统一样。

    最新回复(0)