(注:内容来自《Hadoop数据分析》)
基于Hadoop的两种采样模式:百分比采样和N样本采样。
1.随机百分比采样:
从样本中随机抽取一个比例的样本,一种方法是对每条数据简单地使用随机数生成器来产生[0,1]上均匀分布的随机数,并将其与期望的阈值大小进行比较,小于阈值的留下,大于阈值的跳过。
mapper.py
import random # class Mapper可参考上一篇博客 class PercentSampleMapper(Mapper): def __init__(self,*args,**kwargs): self.percentage = kwargs.pop("percentage") super(PercentSampleMapper,self).__init__(*args,**kwargs) def map(self): for _, val in self: if random.random() < self.percentage: self.emit(None,val) if __name__=='__main__': mapper = PercentSampleMapper(sys.stdin,percentage=0.2) mapper.map接着,使用一个恒等reducer合并数据
2.随机抽取固定容量样本:
仍然对每条数据产生一个[0,1]上的均匀分布,然后通过堆维护一个容量为n的样本。
import random,heapd class SampleMapper(Mapper): def __init__(self,n,*args,**kwargs): self.n = n super(SampleMapper,self).__init__(*args,**kwargs) def map(self): heap = [0]*self.n for value in self: #heappushpop 如存在random.random()>heap中的某值,则将heap中最小值弹出,并将random.random()随机插入到heap heapd.heappushpop(heap,(random.random(),value)) for item in heap: self.emit(None,item) class SampleReducer(Mapper): def __init__(self,n,*args,**kwargs): self.n = n supper(SampleReducer,self).__init__(*args,**kwargs) def reduce(self): heap = [0]*self.n for _,value in self: for value in values: heapd.heappushpop(heap,make_tuple(value)) for item in heap: self.emit(None,item[1])该方法获得的样本数是mapper的数量的n倍,通过单个reducer对数据重新抽取。