(注:内容来自《Hadoop数据分析》)
Hadoop Streaming与Spark Streaming或其他使用“无界数据流”的实时计算框架不同。Hadoop Streaming中的“流”指的是标准的Unix流 stdin,stdout,stderr。
为了执行MapReduce作业,Streaming利用标准的Unix流进行输入和输出,因此得名Streaming。mapper和reducer的输入都是从stdin读取的,Python进程可以通过sys模块访问stdin。Hadoop要求由Python编写的mapper和reducer将它们输出的键值对写到stdout中。
当Streaming执行作业时,每个mapper任务将在自己的进程内启动提供的可执行文件;然后,将输入数据转换为文本行并将其输送到外部进程的stdin的同时,从stdout收集输出。输入数据的转换通常是直接将值序列化,因为数据是从HDFS读取的,其中每行都是一个新值。mapper要求输出是键或值格式的字符串,其中键和值通过某个分隔符分隔,默认为制表符(‘\t’)。如果没有分隔符,mapper就认为输出只有键,值为null。
对mapper的输出进行shuffle和sort之后,reducer也启动了可执行文件。mapper输出的键值字符串通过stdin传输到reducer作为输入,reducer的输入和mapper的输出相互匹配,并保证按键分组。reducer发送到stdout的输出的格式应该与mapper的键、分隔符和值的格式相同。
为了使用Python编写Hadoop作业,需要创建两个Python文件:mapper.py和reducer.py。只需要在这两个文件中导入sys模块,就可以访问stdin和stdout。代码本身需要以字符串的形式处理输入、解析和转换每个数组或复杂的数据类型,我们也需要将输出序列化为字符串。
一个mapper.py的例子,将输入数据转化为 键-分隔符-值 并写入到stdout的例子
#!/usr/bin/env python import sys if __name__=='__main__': for line in sys.stdin: for word in line.split(): sys.stdout.write.("{}\t1\n".format(word))一个reducer.py的例子 用于词频统计
#/usr/bin/env python import sys if __name__=='__main__': curkey = None total = 0 for line in sys.stdin: key,val = line.split('\t') val = int(val) if key == curkey: total += val else: if curkey is not None: sys.stdout.write("{}\t{}\n".format(curkey,total)) curkey = key total = val通过mapper,相同键的数据会通过shuffle被汇总到一起,并根据键进行排序。可理解为,此时,一个key必定只存在于一个数据块中,并且它们是连续排列的,但一个数据块中可能有多个键的数据。
例:共同社交好友
#数据 Allen Chris,David,Betty Betty Chris,David,Ellen,Allen Ellen Betty,David,Chris David Allen,Betty,Chris,Ellen Chris Allen,Betty,Ellen,David #mapper.py #!/usr/bin/env python import sys if __name__=='__main__': for line in sys.stdin: tmp = line.split() person = tmp[0] tmp2 = tmp[1].split(",") for item in tmp2: pair = [person,item] pair.sort() pair = pair[0]+','+pair[1] sys.stdout.write("{}\t{}\n".format(pair,tmp[1])) #reducer.py #!/usr/bin/env python import sys if __name__=='__main__': lastkey = None lastvalue = None for line in sys.stdin: key,value = line.split('\t') #注:reducer读取的数据中,每行的末尾带有'\n'换行符,需要先去掉,否则结果将出错 value = value.strip('\n') if key == lastkey: shared = lastvalue.split(',') shared = set(shared) value = value.split(",") shared = shared.intersection(set(value)) sys.stdout.write(key+'->'+",".join(list(shared))+'\n') else: lastkey = key lastvalue = value
封装mapper和reducer
#/usr/bin/env python import sys import csv class Mapper(): def __init__(self,stream,sep='\t'): self.strem = stream self.sep = sep def emit(self,key,value): sys.stdout.write("{}{}{}\n".format(key,self.sep,value)) def map(self): #填写map函数 for row in self: self.emit(....key,....value) # __iter__是python的一个特殊方法,用于迭代 # 此处,__iter__迭代从self.stream中读取数据,然后添加到迭代器中(粗略理解为存储在self中) # 然后供map()方法获取 def __iter__(self): reader = csv.reader(self.stream) for row in reader: yield row if __name__=='__main__': mapper = Mapper(sys.stdin) mapper.map() #/usr/bin/env python import sys from itertool import groupby from operator import itemgetter class Reducer(): def __init__(self,stream,sep='\t'): self.stream = stream self.sep = sep def emit(self,key,value): sys.stdout.write("{}{}{}\n".format(key,self.sep,value)) def reduce(self): #编写reducer函数 #groupby通过扫描文件,将相同key且相连的数据分为一组 #itemgetter() 为分割方式,itemgetter(1)每行为一组, itemgetter(0)按如下分组: #如 文件以(1,1) (1,2) (2,1) (2,1) (1,1) (1,1)的顺序 #则 groupby将文件分为三组 (1,1) (1,1) (2,1) (2,1) (1,1) (1,1) #而在 mapper到reducer的过程中,相同key的数据已经被放到相邻位置,所以groupby即可实现分组 for current,group in groupby(self,itemgetter(0)): ....... for item in group: ........ def __iter__(self): for line in self.stream: try: parts = line.split(self.sep) yield parts[0],float[1] except: continue if __name__ == '__main__': reducer = Reducer(sys.stdin) reducer.reduce()将单词计数例子用上述结构重写编写
mapper.py
#!/usr/bin/env python import sys class Mapper(): def __init__(self,stream,sep = '\t'): self.stream = stream self.sep = sep def emit(self,key,value): sys.stdout.write("%s%s%s\n"%(key,self.sep,str(value))) def map(self): for item in self: self.emit(item,1) def __iter__(self): for line in self.stream: for item in line.split(): yield item if __name__=='__main__': mapper = Mapper(sys.stdin) mapper.map()reducer.py
!/usr/bin/env python import sys from itertools import groupby from operator import itemgetter class Reducer(): def __init__(self,stream,sep='\t'): self.stream = stream self.sep = sep def emit(self,key,value): sys.stdout.write("%s%s%s\n"%(key,self.sep,value)) def reduce(self): for current,group in groupby(self,itemgetter(0)): count = 0 for item in group : count += int(item[1]) self.emit(item[0],count) def __iter__(self): for line in self.stream: yield line.split("\t") if __name__=='__main__': reducer = Reducer(sys.stdin) reducer.reduce()
Hadoop高级用法:利用标准错误流(stderr)更新Hadoop状态以及Hadoop计数器
#使用Reporter的Counter和Stutus,为上述的Mapper和Reducer添加如下方法: def status(self,message): sys.stderr.write("reporter:status:{}\n".format(message)) def counter(self,counter,amount=1,group="ApplicationCounter"): sys.stderr.write( "reporter:counter:{},{},{}\n".format(group,counter,amount") )具体使用
#!/usr/bin/env python import sys class Mapper(): def __init__(self,stream,sep = '\t'): self.stream = stream self.sep = sep def emit(self,key,value): sys.stdout.write("%s%s%s\n"%(key,self.sep,str(value))) def map(self): self.status("mapping started") for item in self: self.counter("wordCount") self.emit(item,1) self.status("mapping complete") def status(self,message): sys.stderr.write("reporter:status:{}\n".format(message)) def counter(self,counter,amount=1,group="ApplicationCounter"): sys.stderr.write( "reporter:counter:{},{},{}\n".format(group,counter,amount) ) def __iter__(self): for line in self.stream: self.counter("lineCount") for item in line.split(): yield item if __name__=='__main__': mapper = Mapper(sys.stdin) mapper.map() #说明:运行到self.status()则输出一次状态信息 #self.counter() 运行一次更新一次计数器 # 可通过给定不同的参数,来对不同类型进行累加 #如self.counter("lineCount"),统计的是文件的行数 #self.counter("wordCount"),完成单词数的统计!计数器只在集群模式下可用,本地测试环境会当成字符串输出!
提交代码
本地环境测试
chmod +x mapper.py chmod +x reduce.py cat xxx.csv | python ./mapper.py | sort | python ./reducer.py提交到集群
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \ -input xxxx.xxx(输入数据) -output result(输出数据) \ -mapper mapper.py \ -reducer reducer.py \ -file mapper.py \ -file reducer.py #-input 文件存储地址 #-output 计算结果存储地址 #-mapper mapper存储地址 #-reducer reducer存储地址 #-file 将文件发送到集群上其它节点优化方法:
1.对局部先进行聚合(mapper-reducer 相当于spark的groupby ,而mapper-combiner-reducer相当于spark的reducebykey)
增加 -combiner