《Hadoop实战第2版》——3.5节Hadoop Pipes

    xiaoxiao2022-05-29  224

    3.5 Hadoop PipesHadoop Pipes提供了一个在Hadoop上运行C++程序的方法。与流不同的是,流使用的是标准输入输出作为可执行程序与Hadoop相关进程间通信的工具,而Pipes使用的是Sockets。先看一个示例程序wordcount.cpp:

    #include "hadoop/Pipes.hh" #include "hadoop/TemplateFactory.hh" #include "hadoop/StringUtils.hh" const std::string WORDCOUNT = "WORDCOUNT"; const std::string INPUT_WORDS = "INPUT_WORDS"; const std::string OUTPUT_WORDS = "OUTPUT_WORDS"; class WordCountMap: public HadoopPipes::Mapper { public: HadoopPipes::TaskContext::Counter* inputWords; WordCountMap(HadoopPipes::TaskContext& context) { inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS); } void map(HadoopPipes::MapContext& context) { std::vector<std::string> words = HadoopUtils::splitString(context.getInputValue(), " "); for(unsigned int i=0; i < words.size(); ++i) { context.emit(words[i], "1"); } context.incrementCounter(inputWords, words.size()); } }; class WordCountReduce: public HadoopPipes::Reducer { public: HadoopPipes::TaskContext::Counter* outputWords; WordCountReduce(HadoopPipes::TaskContext& context) { outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS); } void reduce(HadoopPipes::ReduceContext& context) { int sum = 0; while (context.nextValue()) { sum += HadoopUtils::toInt(context.getInputValue()); } context.emit(context.getInputKey(), HadoopUtils::toString(sum)); context.incrementCounter(outputWords, 1); } }; int main(int argc, char *argv[]) { return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap, WordCountReduce>()); }

    这个程序连接的是一个C++库,结构类似于Java编写的程序。如新版API一样,这个程序使用context方法读入和收集对。在使用时要重写HadoopPipes名字空间下的Mapper和Reducer函数,并用context.emit()方法输出对。main函数是应用程序的入口,它调用HadoopPipes::runTask方法,这个方法由一个TemplateFactory参数来创建Map和Reduce实例,也可以重载factory设置combiner()、partitioner()、record reader、record writer。接下来,编译这个程序。这个编译命令需要用到g++,读者可以使用apt自动安装这个程序。g++的命令格式如下所示:

    apt-get install g++

    然后建立文件Makerfile,如下所示:

    HADOOP_INSTALL="你的hadoop安装文件夹" PLATFORM=Linux-i386-32(如果是AMD的CPU,请使用Linux-amd64-64) CC = g++ CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include wordcount: wordcount.cpp $(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes -lhadooputils -lpthread -g -O2 -o $@

    注意在$(CC)前有一个符号,这个分隔符是很关键的。在当前目录下建立一个WordCount可执行文件。接着,上传可执行文件到HDFS上,这是为了TaskTracker能够获得这个可执行文件。这里上传到bin文件夹内。

    ~/hadoop/bin/hadoop fs –mkdir bin ~/hadoop/bin/hadoop dfs –put wordcount bin

    然后,就可以运行这个MapReduce程序了,可以采用两种配置方式运行这个程序。一种方式是直接在命令中运行指定配置,如下所示:

    ~/hadoop/bin/hadoop pipes\ -D hadoop.pipes.java.recordreader=true\ -D hadoop.pipes.java.recordwriter=true\ -input input\ -output Coutput\ -program bin/wordcount

    另一种方式是预先将配置写入配置文件中,如下所示:

    <?xml version="1.0"?> <configuration> <property> // Set the binary path on DFS <name>hadoop.pipes.executable</name> <value>bin/wordcount</value> </property> <property> <name>hadoop.pipes.java.recordreader</name> <value>true</value> </property> <property> <name>hadoop.pipes.java.recordwriter</name> <value>true</value> </property> </configuration>

    然后通过如下命令运行这个程序:

    ~/hadoop/bin/hadoop pipes -conf word.xml -input input -output output

    将参数hadoop.pipes.executable和hadoop.pipes.java.recordreader设置为true表示使用Hadoop默认的输入输出方式(即Java的)。同样的,也可以设置一个Java语言编写的Mapper函数、Reducer函数、combiner函数和partitioner函数。实际上,在任何一个作业中,都可以混用Java类和C++类。

    相关资源:七夕情人节表白HTML源码(两款)

    最新回复(0)