首先RDD提供以下功能:
跨集群的不可变存储(在Spark中,记录是指Java Object)使用键对数据进行分区控制考虑分区的粗粒度运算符由于是内存计算,所以低延迟在专业引擎里,不只是数据运算符,而且存储格式、甚至访问方法都被优化了。如SQL引擎Shark是以面向列的格式处理数据,同时一个图计算引擎如GraphX在数据被创建索引的情况下性能更高。我们讨论使用RDD实现这些优化的常见方法,利用RDD的容错和组合优势,同时保持和专业系统匹敌的性能。
尽管RDD存储简单、扁平化的数据,实现更丰富的存储格式的一个有效途径是在一个RDD记录中存储多个数据项,从而在每个记录中实现更复杂的存储。以这种方式批量处理几千条记录足以实现专用数据结构的大部分效率,同时仍将每个RDD记录保持在几兆字节大小。 例如,在分析数据库中,一个通用的优化方式是面向列存储和压缩。在Shark系统中,我们将多个数据库记录保存在一个Shark记录中,并应用这种优化。一个有10000到100000个记录的批的压缩程度非常接近于以面向列的格式存储整个数据库,以至于我们仍然有显著的优势。一个高级的例子,GraphX需要对表示图的顶点和边的数据集执行非常稀疏的join。它通过在每个RDD记录中存储一个包含多个图记录的哈希表来实现这一点,从而在与新的数据join的时候能够快速找到特定的顶点和边。这点在图计算算法中很重要,在最后几次迭代中只有很少的顶点和边处于active状态,但是他们仍然需要和整个数据集进行join。这一机制能被用于Spark Streaming 的updateStateByKey的更有效地实现。 RDD的两方面的原因使得这种方式很高效。首先,RDD常见于内存中,指针能够被用于仅仅读取和每个操作相关的那部分记录。例如一组面向列的(Int , Float)记录的表示能够实现为一个Int数组和一个Float数组。如果我们只想读Int那一列,我们就可以把指针移到该列,从而避免扫描内存中的Float列。同样地,在上面哈希表的RDD中,我们可以只扫描我们需要的record。 其次,一个问题可能是,在每个计算模型都有其自己的批处理记录表示的情况下,如何有效地组合处理类型。幸运的是,RDD的低级别的接口是基于迭代器的,这使得不同的格式之间可以快速地、流水线式的进行转换。包含复合记录的RDD可以通过批处理上的flatMap操作有效地在解压缩的记录上返回一个迭代器。这可以和对解压记录的进一步窄依赖转换操作、或者是把记录打包成另一种格式的转换操作进行pipeline组合处理,减少解压数据的传输量。迭代器对内存数据是一种有效的接口,因为他们典型地用来执行扫描。只要每个批处理记录适合CPU缓存,迭代器就能够快速地执行转换。
专业模型中第二个常见的优化是以特定于域的方式跨集群分区数据以加速应用。例如,Pregel 和HaLoop使用用户自定义函数来保持分区,加速数据集的join。并行数据库通常也提供各种形式的分区。使用RDD,分区可以通过每个记录的键被轻松实现(事实上,这是RDD记录中的唯一结构概念)。注意,尽管key用于整个记录,匹配多个基础数据项的组合记录依然具有有意义的键。例如在GraphX中,每个分区记录具有相同的以分区数量为模的散列码,但仍在其中散列以实现高效查找。当使用组合记录的系统执行shuffle操作如groupBy时,他们可以将每个传出记录hash到目标复合记录的键。 分区的最后一个有趣用例是复合数据结构,其中,数据结构的某些字段随时间更新,而其他字段不是。例如在PageRank中,有两种类型的数据,页面的link列表,不可变的、页面的rank,可变的。我们将其表示成两种RDD,(ID, links)键值对和(ID, ranks)键值对,他们由ID共同划分分区。系统优先将每个ID的排名和链接放在同一台机器上,但是我们可以单独更新排名而不更改链接。同样的技术被用于保持GraphX中顶点和边的状态。 通常情况下,人们可以将RDD视为集群环境下更具声明性的内存抽象。在单台机器上使用内存时,程序员担心数据布局主要是为了优化查找和最大化共同访问的信息的colocation(托管)。在分布式内存上,RDD在出于相同的考虑下给予控制,通过让用户选择分区函数和共同分区数据集,但也避免要求用户准确指定每个分区的位置。因此,在运行时可以基于可用资源有效放置分区,或者在失败时移动他们,而程序员仍然控制访问性能。
RDD模型和专业系统的第三个区别是RDD是不可变的,不可变性对于谱系和故障恢复的推理很重要,尽管从根本上讲这和用于这一目的的可变数据集和跟踪版本号没什么区别。 不可变性和容错将会带来一些开销,在很多情况下,以下两种技术能够得到比较好的性能:
复合数据结构表示为多个共同分区的RDD,如PageRank例子中的数据集,允许程序仅仅修改需要改变的那部分数据的状态。在很多算法中,在绝大部分的迭代中,都有记录的某些域需要改变而其他不需要的情况。这一方法可以有效地抓住核心。即使在记录中,当内部数据结构不变时,指针也可被用于从记录的先前版本中重用状态。举个例子,java中的string是不可变的。在一个(Int,String)的记录中使用map,改变了Int,但是保持了相同的String,将有个指针指向之前的String对象,而不是拷贝它。更一般地,来自函数式编程的持久化数据结构能够被用于将其它形式的数据(eg:哈希表)的增量更新表示为来自先前版本的增量。函数式编程的很多思想都能够直接帮助RDD。我们发现在某些应用中有用的最后一种技术是使用低级的RDD接口实现自定义依赖模式和转换。接口很简单,仅仅需要父RDD上的依赖列表和给定父RDD迭代器的情况下计算迭代器的分区的函数。我们在第一版的Shark和GraphX中实现的一些自定义的操作符的经验引发很多新的操作符被添加到Spark中。例如,mapPartitions,给定一个RDD[T]和一个从Iterator[T] 到 Iterator[U]的函数,通过应用到每个分区,返回RDD[U]。这非常接近RDD的最低级别接口,允许转换在每个分区内执行非功能性操作(eg:使用可变状态)。Shark同样包含用于替代内置操作符的自定义的join和groupBy。即使应用实现自定义转换,也将自动受益于RDD模型的容错、多租户、组合优势。同时也比单独的系统更容易开发。
作为在RDD上实现了复杂的存储和处理的示例,Shark在经过深入研究的并行数据库领域取得了很好的性能,同时提供传统数据库缺少的容错和复杂分析能力。
现代数据分析的几个挑战:
数据量正在迅速扩大,需求扩大到上百台商业机器的集群。这种扩展也扩大了错误和落后者(慢任务)的发生率,使得并行数据库设计复杂化。数据分析的复杂性也已经提高:现代数据分析采用复杂的统计方法,如机器学习算法,远远超出传统企业数据仓库系统的roll-up和drill-down能力。尽管在规模和复杂性上都进一步提升,用户还是希望能够以可交互的速度查询数据。 为解决“大数据”问题,探讨了两大系统:首先是由MapReduce和通用处理程序组成,提供适合于集群的细粒度的容错模型,在任务失败或者节点缓慢的情况下能够确定性地在其它节点重新执行。MapReduce也很通用:它可以表达很多统计和学习算法。它也很容易地支持非结构化数据和“schema-on-read”。然而,MapReduce缺乏很多使数据库高效的特性,因此导致了数十秒到几小时不等的延迟。即使是显著优化了的MapReduce SQL查询系统,比如谷歌的Tenzing系统,或者是将MapReduce和传统数据库在每个节点上联合的系统,如HadoopDB,都有最低10s的延迟。因此,对于交互式查询,MapReduce不可用。 相反,大多数MPP数据库系统(e.g:Vertica, Greenplum, Teradata)和一些构建于MapReduce环境的新的低延迟的引擎(e.g:Google F1, Impala )均采用粗粒度的恢复模型,在某个节点失败的情况下整个查询都必须重新提交。这在重试代价较小的短查询中工作良好,但随着集群扩展,长查询就面临着巨大的挑战。除此之外,这些系统缺少能够很容易用MapReduce实现的分析功能,比如机器学习和图计算算法。事实上,这些可以通过UDF来实现,但是他们很昂贵,加剧了长查询容错和落后者恢复的需求。因此,大多数公司都采用除MPP之外的系统来进行复杂分析。 为了给大数据分析提供有效的环境,系统需要提供有效的SQL查询和复杂分析,并在两种类型的操作中提供细粒度的错误恢复。 Shark使用RDD模型,计算大多在内存中执行,同时提供细粒度容错处理。(这里指的细粒度是从失败节点的分区上来讲,粗粒度指整个查询重新进行)。在大规模数据的分析中内存计算的重要性在增加,有两点理由:首先,大多数分析函数,如机器学习和图计算,都是迭代式的。其次,即使是传统的SQL仓库工作负载也表现出极大的时间和空间的局部性。因为更多近期事实表数据和小维度表经常被不成比例地读取。一个对facebook HIVE仓库和微软Bing 分析集群的研究显示,即使每个系统管理超过100PB的数据,超过95%的查询都能够被64GB/节点作为缓存来提供内存服务。 为了有效地运行查询,我们必须扩展RDD执行模型,引入一些传统分析数据库和一些新型数据库的理念。首先,为了有效地存储和处理关系型数据,我们实施了内存中的列式存储和压缩。与直接存储数据相比,这种方式减少了数据大小和处理时间多达五倍。其次,即使是在有分析函数或者是UDF的时候,为了优化基于数据特征的SQL查询,使用部分DAG执行(PDE)扩展了Spark:Shark能够在第一次运行了任务DAG的几个阶段之后重新优化在运行查询,根据观察到的统计信息选择更好的join策略和正确的并行度。第三,我们利用了在传统的MapReduce系统中不存在的Spark引擎的其他特性,例如控制数据分区。 Shark系统与Hive兼容,支持Hive的所有SQL语法和UDF以及允许在未经修改的Hive的数据仓库上执行。它使用Spark的Java、Scala、Python API编写的复杂分析函数扩充SQL,这些功能可在单个执行计划中与SQL结合使用,提供内存数据共享和跨两种处理类型的快速恢复。 使用RDD和上述优化的试验表明,Shark回复查询的速度比Hive要快高达100倍,运行迭代式机器学习算法要比Hadoop快高达100倍,并且几秒钟便可从查询失败中恢复。注意,Shark提供细粒度的恢复和复杂分析特性。Shark在Spark上运行SQL,像RDBMS一样,也采用三步法:SQL解析、生成逻辑执行计划、生成物理执行计划。 Spark使用Hive的compiler来解析查询,生成一个抽象语法树。然后,将树转换为逻辑执行计划,并进行基本的逻辑优化,如谓词下推。到这一步为止,Shark和Hive采用相同的方法。Hive然后将运算转换成多个MapReduce 阶段组成的物理执行计划。Shark的做法是,采用优化器对逻辑执行计划进行额外的基于规则的优化,例如将LIMIT下推到各自的分区,然后生成由对RDD的transformation组成的物理执行计划,而不是MapReduce作业。Shark使用Spark中的多种操作算子来执行转换,例如map、reduce、broadcast joins等。然后Spark Master使用标准的DAG调度技术来执行这一作业的沿袭图,包括将任务提交到靠近其输入数据所在的节点执行、容错恢复、落后者缓解等。 但是,由于在工作负载中,经常性的UDF和复杂分析函数的使用使得很难在编译阶段确定最佳执行计划。特别是处理未经ETL的数据时。直接基于RDD正常处理并不高效。下面是Shark的优化方法:
内存存储影响空间和读取吞吐两方面。一种简单的方法是以原生格式缓存磁盘数据,在查询处理器中按需执行反序列化。这种情况下,反序列化便成了性能瓶颈:研究显示,现代商业CPU单核能以每秒200M的速度反序列化数据。 Spark的内存存储方式默认是将数据分区存储为JVM对象集合。这样能避免反序列化,因为CPU可以直接使用这些对象,但是这样带来了明显的内存存储空间开销。通用的JVM实现,会给每个对象添加12-16字节的开销。例如,将270M的TPC-H lineitem表按JVM 对象存储使用大学971M的内存,而序列化存储仅需要289M,几乎减少了3倍的空间。一个严肃点的问题是,是对GC的影响。每个记录200 B,一个32GB的堆能够容纳1亿6千万个对象。GC时间与堆中对象的个数线性相关,所以,对于一个大堆,可能要花上几分钟来进行一次全GC。而GC是不可预测的而且是昂贵的,这导致了响应时间的巨大差异。 Shark将所有原始类型的列存储为JVM原始数组。Hive支持的复杂的数据类型,例如map和array,被序列化并被连接成单字节数组。每列只创建一个对象,从而实现快速GC和紧凑的数据表示。通过廉价的压缩技术,列式数据的空间占用还能被进一步减小,而且CPU开销几乎很小。和列式数据库系统差不多,Shark实现了高效的CPU压缩方案,例如字典编码、游程编码、位打包。 列式数据表示还有更好的缓存表现,特别是对列式存储的特定列进行频繁聚合的分析查询。
在一些数据仓库工作负载中,两张表会进行很频繁的join。MPP数据库通常使用的办法是在数据join过程中对两个表按key进行协同分区。不过对于HDFS这样的分布式文件系统,其存储系统与schema无关,这使之不能进行数据协同分区。Shark支持在子查询中对两个表的相同key值进行协同分区以加速join。它在表创建语句中添加了一个DISTRIBUTE BY子句,该子句用于分发列。
通常,数据是按照一个或者多个列上的逻辑聚类来存储的。例如,网站的流量日志数据会按照用户的地理位置分组,因为用户数据的首选存储位置是跟用户具有地理位置邻近性的数据中心中的。在每个数据中心中,日志仅附加并按照大致时间顺序存储。例如,新闻网站通常包含强相关的新闻ID和时间戳列。对于分析查询,通常在这些列上应用过滤谓词或聚合,例如,仅搜索典型日期范围或新闻文章上的事件。 映射修剪是基于其自然聚类列修剪数据分区的过程。由于Shark的内存存储器将数据拆分为小分区,因此每个块在这些列上只包含一个或几个逻辑组,如果Shark的值超出查询的过滤范围,Shark可以避免扫描某些数据块。为了利用这种自然聚类,Shark的每个worker都会捎带数据加载过程来收集统计信息。收集的分区信息包括列值的范围和唯一值(如果唯一值特别少的情况下)。这些收集的统计信息被发送到driver节点并存储在内存中,在执行查询的时候用来做分区裁剪。当一个查询下发时,Shark在所有的分区统计信息中评估谓词,然后将未匹配上谓词的分区全部裁剪掉。这是通过简单地构建仅依赖于父分区的某些分区的RDD来完成的。
像Shark和Hive这样的系统,频繁的被用于查询新的还未经历数据加载的新数据。这排除了使用准确的先验数据统计的静态查询优化技术,例如由索引维护的统计数据。不过由于新数据的统计特性的缺乏,以及经常使用UDF,就带来动态方法进行查询优化的需要。但是怎么才能在分布式的环境下进行动态优化呢?这就用到了PDE,这种技术能够支持查询引擎根据运行时收集的统计信息动态调整执行计划。Shark目前将这种PDE应用在数据会被交换和重分区的shuffle运算符边界上,因为这些是Shark中代价最大的操作。通常情况下,在shuffle之前,Spark会将map输出的数据物化在内存中,必要时溢出到磁盘,然后reduce任务会拉取这些数据。 PDE在两方面修改了这一机制。首先,在物化map端输出的同时收集定制的全局的并且以分区为粒度的统计信息。其次,它允许DAG根据这些统计信息做出改变,或者是更换算子或者是修改参数(比如修改并行度)。这些统计信息是可定制的,只需简单的使用一个可插拔的累加器API,统计信息包括: 1、分区大小和记录数量,能用来探测数据倾斜。 2、“高命中率”清单,比如说在一个RDD上频繁发生的条目。 3、近似直方图,能够用于评估分区的数据分布。 每个worker都把这些统计信息发送给master,统计信息在master上聚合并呈现给优化器。为了提高效率,Shark对统计记录使用有损压缩,并且使每个任务限制在1-2KB大小。例如,使用对数编码方式对分区大小进行编码,这样就可以使用最多10%错误的1个字节表示高达32GB的分区大小。然后,master就可以利用这些统计信息进行多种运行时优化。 通过PDE技术实现的优化方法: 1、join算法选择:当对两张表进行join的时候,Shark通过PDE技术来选择join方式,是通过shuffle join(对两个记录集合全部进行按key全网哈希)还是broadcast join(把小表复制发送到大表所在的各个节点),当一个表远比另外一张表小的时候,广播join的网络通信开销小得多。因为无法提前预知表的大小(eg:UDF),所以,在运行时选择算法能带来更好的性能。 2、并行度:大家都知道,并行度对类似MapReduce这样的系统的性能有着巨大的影响。如果并行度过低,会造成reducer的网络连接过载并耗尽内存。如果并行度过高,又会因为过多时间花在调度上而延长作业执行时间。使用PDE技术,Shark能够在运行时根据各自分区的大小决定reducer的数量,通过将很多小的、细粒度的分区合并成少量的粗粒度的大分区供reduce任务使用。 3、处理倾斜:和将map 输出预分区成很多小的桶能帮助选择reduce的任务数一样,这也能帮助识别主流的key(数量大)从而特殊对待,这些主流key值可以到同一个reduce任务中,而其他的桶可以合并以形成较大的任务。 部分DAG执行技术补充了通常在单节点系统中运行的现有的自适应查询优化技术。所以我们能够在各自节点中动态优化本地执行计划,并通过PDE技术在stage边界进行全局执行计划的结构优化。这种细粒度的统计信息收集以及由此进行的优化,将PDE技术与以前的系统的沿袭图重写功能区别开来。例如DryadLINQ。只是现在PDE原型只在Shark中实现了,后续会集成到Spark core中,这样除Spark SQL之外的应用也可以使用了。
在官方的测试结果中指出,Shark的速度要比Hive和Hadoop快100倍。也指出在一些特定场景中,数据能够存放在内存的情况下,Shark的性能还能超过MPP系统。但是,不是说Shark一定比MPP系统要快,而且相对于MPP系统,Shark还有一些劣势,比如说Shark运行于JVM中。也就是说Shark能够提供与MPP可比拟的性能,并同时还保有类MAP-REDUCE的引擎,并由此引擎提供细粒度的容错恢复特性。重要的是这一引擎还能在数据集上执行复杂的分析(eg:机器学习功能)。
开发者测试时数据: 集群配置:100个节点 每节点8虚拟核、68GB内存、1.6T存储空间 软件版本:64-bit Linux 3.2.28、Apache Hadoop 0.20.205、Apache Hive 0.9 对于Hadoop MapReduce,map task 和 reduce task都设置为8,对于Hive,使能JVM 重用并避免合并输出文件,因为会导致查询出结果之后合并的额外步骤。 测试6次,并丢弃第一次结果,将其他五次结果取均值。为什么丢弃第一次,这里能学到点常识,也就是说这样可以允许JVM在即时编译时够优化公共代码路径,在设置JVM重用之后,后续的执行就能节省不少加载时间。
这一基准使用两个表:1GB/node rankings表和20GB/node的uservisits表。对于100个节点的集群,测试者创建了100G的rankings 表包含18亿行,和2TB的uservisits表包含155亿行数据。进行了4项查询来比较Shark和Hive,同时测试者还进行了手动调节Hive的reduce数量来优化Hive任务,但Shark仍远远优于Hive。
图3.1 selection和aggregation查询的运行时间对比图 图3.2 join查询的运行时间(s)对比图 第一项 选择查询:SELECT pageURL, pageRank FROM rankings WHERE pageRank > X; 测试者引用别人的数据,显示Vertica执行选择查询要比Hadoop快10倍,因为Vertica创建了聚集索引(数据行的物理顺序与列值(一般是主键的那一列)的逻辑顺序相同)。从测试者的图显示,对于内存数据,在没有建立聚集索引的情况下Shark也比Hive快80倍。如果数据在HDFS上,Shark也比Hive快5倍。
第二项 聚合查询:SELECT sourceIP, SUM(adRevenue) FROM uservisits GROUP BY sourceIP; SELECT SUBSTR(sourceIP, 1, 7), SUM(adRevenue) FROM uservisits GROUP BY SUBSTR(sourceIP, 1, 7); 第一个查询有2百万个组,第二个查询有将近1千个组。Shark和Hive都运用本地任务聚合然后对数据进行shuffle对最后的merge聚合并行化。可以看到Spark还是大大优于Hive。而用于对比的基准MPP数据库则是先在每个节点执行本地聚合,然后将聚合结果发送给一个查询协调器进行合并。这在组特别少的情况下表现良好,如果在组特别多的时候表现就差劲了。MPP数据库的选择计划就好比是Shark和Hive在最终merge的时候只选择一个reduce一样。
第三项 join查询: 将2T的uservisits表和100 GB rankings表SELECT INTO Temp sourceIP, AVG(pageRank), SUM(adRevenue) as totalRevenue FROM rankings AS R, uservisits AS UV WHERE R.pageURL = UV.destURL AND UV.visitDate BETWEEN Date(‘2000-01-15’) AND Date(‘2000-01-22’) GROUP BY UV.sourceIP; 从图中可以看出,Shark还是优于Hive,但是从内存提供数据并不比从磁盘提供数据得到更大好处。原因是join步骤支配着查询过程。然而,对两个表进行协同分区提供了显著的好处,因为这样避免了在join步骤中对2.1TB的数据进行shuffle。
数据加载:Hadoop的数据加载吞吐率很高,大约在MPP数据库的5倍左右。在3.4章节中提到,Shark能够直接用于查询HDFS数据,这说明Shark载入数据的能力至少和Hadoop是一样的。在生成2T uservisits表之后,测试者将其加载进HDFS和加载到内存中的时间进行对比,发现加载到内存的速率要比加载到HDFS快5倍。
为了找到影响Shark性能的因素,Shark方进行了多个微基准测试,用TPC-H提供的DBGEN程序生成一个100GB和1TB的数据,这些数据包含不同基数的表和列,可用于创建很多个微基准测试用来测试不同运算符。试验过程中,Hive和Hadoop MapReduce对作业的reduce数量设置很敏感。Hive的优化器根据评估的数据量大小来自动设置reduce数量。但是,发现Hive优化器频繁做出错误的决定,使任务执行时间难以置信的长。
聚合性能:在表lineitem上运行聚合查询,对100G的数据集,lineitem包含6亿行数据,对1T数据集,包含60亿行。 试验结果如图:
图3.3 lineitem表上的聚合查询,横轴表示每个聚合查询的组数 查询语句:SELECT [GROUP_BY_COLUMN], COUNT(*) FROM lineitem GROUP BY [GROUP_BY_COLUMN] 分别进行不带group-by的查询和三个带group-by的聚合查询:SHIPMODE (7 groups), RECEIPTDATE(2500 groups), and SHIPMODE (150 million groups in 100 GB, and 537 million groups in 1 TB). 对于Shark和Hive,聚合首先都是在每个分区上进行,然后中间结果会被重新分区再发送到reduce任务上进行最终聚合。随着group的数量变多,更多的数据需要在网络间shuffle。图3.3是Shark和Hive的对比,而且比较了内存中数据和从HDFS加载数据的情况下Shark各自的表现。可以看到在group比较小的时候,Shark比手动调节参数的Hive执行速度要快80倍,当数据量很大的时候,也能比Hive快20倍,数据量很大的时候,决定整个执行时间的取决于shuffle。 对Spark性能解释的关键点来了,有没有发现为甚么Shark on disk也能有这样的表现?毕竟,Shark和Hive都需要从HDFS读取数据并反序列化从而进行查询处理。这些情况可以由Shark的低任务启动开销、优化过的shuffle运算符、以及其他因素来解释。Shark执行引擎每秒可以启动上千个task来最大化可用并行度。而且对于聚合查询,只需要执行基于hash的shuffle。在Hive中,仅有的shuffle机制是由Hadoop MapReduce提供的基于排序的shuffle,这在计算上要比hash更昂贵。 运行时join选择:这一部分是个重点,这里Shark进行了关于为什么PDE能够通过在运行过程中再优化查询计划从而提高查询性能的实验。这一查询对1TB TPC-H数据集的lineitem和supplier表进行连接,使用UDF基于地址来查询感兴趣的供应商。这个特例中,UDF从1000万供应商选出1000个。下表展示了结果:
图3.4 优化器选择的join策略与执行时间(s) SELECT * from lineitem l join supplier s ON l.L_SUPPKEY = s.S_SUPPKEY WHERE SOME_UDF(s.S_ADDRESS)出于缺少对UDF的选择性评估,静态优化器将会选择对两张表执行shuffle join,因为两张表的初始化的数据量都比较大。利用PDE,在对两张表执行pre-shuffle map阶段之后,动态优化器发现过滤后的supplier表很小。于是优化器决定执行map-join,将过滤后的supplier发送到每个节点,然后利用lineitem上的map 任务执行join。 为了进一步提高执行性能,优化器能够推断出supplier表比lineitem表小的可能性更高(因为在初始化的时候supplier的数据总大小要小,而且有谓词filter作用在supplier)。优化器决定只对supplier表进行pre-shuffle,并且避免在lineitem启动两波任务。这种静态的查询分析和PDE的结合与纯粹的执行静态选择计划相比带来了3倍的性能提升。
这里,Shark方利用50台EC2组成的集群模拟故障,并测量故障恢复前、中、后的查询性能。使用group-by在100GB的lineitem表查询来测量故障中的查询性能。在将lineitem数据集加载到Shark内存存储之后,将一台worker杀死并继续运行查询,Shark很优雅地从故障中恢复,在其他49台节点上并行构造丢失分区,这一故障恢复影响较小,但是和重新加载整个数据集并且重新执行查询相比,代价小的多。(14s vs 34s)
图3.5 有故障情况下的查询时间(s) 在此查询之后,后续的查询在恢复的数据集上操作,尽管机器比故障前少一些。图3.5所示,故障恢复后的查询性能略好于故障前的查询性能。Shark方的解释是这是JVM JIT编译器的副作用,由于更多的调度程序代码在恢复后查询运行的时候已经编译好了。Shark方表示,早期的工业用户向他们提供了一分Hive数据仓库样本,和这一系统的两年查询记录。一个面向内容生产和发布商的领先的视频分析公司,用户基于Hadoop构建了很多分析栈。获取的数据包含30天的视频会话数据,解压后占据1.7TB空间。它由一个包含103列的事实表组成,大量使用复杂的数据类型,如数组和结构。抽样查询日志包含3833个分析查询,以频率的顺序存储。过滤掉了调用专有UDF的查询,并选择了四个常见查询,这些查询是完整跟踪中其他查询的原型。 这些查询计算不同细分受众群的汇总视频质量指标: 1.查询1在特定日期为特定客户的用户计算12个维度的摘要统计。 2.查询2计算按国家/地区分组的会话数和不同的客户/客户组合,其中包含八列的过滤谓词。 3.查询3计算除2个国家/地区以外的所有会话数和不同用户数。 4.查询4计算按列分组的7个维度中的摘要统计信息,并显示按降序排序的顶部组。
图3.6 真实Hive仓库负载 图中显示了Shark和Hive执行查询的对比,结果是Shark能在亚秒级延迟内处理所有现实生活中的查询,而Hive则需要50到100倍于此的时间去处理。仔细研究表明,这些数据展示了3.4.3节中提到的自然聚类属性,平均而言,map裁剪技术属性使得扫描的数据量减小30倍。
Shark的主要设计目标是提供一个单独系统,具备高效的SQL查询处理和复杂机器学习能力。遵循将计算推到数据上的原则,Shark将支持机器学习作为它的第一类特性。这一目标通过选择Spark作为执行引擎和RDD作为运算的主要数据结构得以实现。 不少研究课题演示用SQL来表达机器学习语言并且不把数据移出数据库是可行的,然而这些项目的实现需要联合SQL、UDF以及用其它语言编写的驱动程序。但是,系统会变得不确定并难以维护,并且这将因使用不是为并行数值计算而设计的传统数据库引擎来执行并行数值计算负载而牺牲性能。这与Shark采用的方法形成了对比,Shark提供数据库内分析,将计算推向数据,通过使用专为这种工作负载而进行优化的运行时并且旨在设计用来表达机器学习算法的编程模型。
用户可以通过两种方式使用Shark的复杂分析功能。一是通过Shark提供的Scala API,这在Spark程序中能调用抽取Shark数据为RDD,然后可对RDD调用任意的Spark计算,这与SQL形成流水线执行。二是Shark扩展了Hive的SQL语法可以让用户直接调用Scala函数作用于RDD,这使得Scala库对Shark简单可用。 一个示例:逻辑回归
def logRegress(points: RDD[Point]): Vector { var w = Vector(D, _ => 2 * rand.nextDouble - 1) for (i <- 1 to ITERATIONS) { val gradient = points.map { p => val denom = 1 + exp(-p.y * (w dot p.x)) (1 / denom - 1) * p.y * p.x }.reduce(_ + _) w -= gradient } w } val users = sql2rdd("SELECT * FROM user u JOIN comment c ON c.uid=u.uid") val features = users.mapRows { row => new Vector(extractFeature1(row.getInt("age")), extractFeature2(row.getStr("country")), ...)} val trainedVector = logRegress(features.cache())可以看出这个程序首先通过sql2rdd函数发起一个SQL查询使得用户信息作为表来检索。然后在查询的rows上进行特征提取,然后在特征矩阵上运行逻辑回归函数。每次迭代都将一个关于w的函数作用于每个点上来产生一组梯度,这些梯度会被求和以产生更新w的净梯度。 其中map、mapRows、reduce函数自动被Shark和Spark并行化并在集群上执行,master程序会收集reduce的输出来更新w。他们和SQL的join操作的reduce步骤一起打包成流水线操作,并通过迭代器接口将列数据从SQL传递到Scala代码,如3.2.1讨论的那样。 我们还提供了一个API来从SQL调用Scala函数。 给定RDD上的Scala函数,例如K-means聚类或逻辑回归,用户可以将其注释为可从SQL调用,然后编写如下的SQL代码: CREATE TABLE user_features AS SELECT age, country FROM user; GENERATE KMeans(user_features, 10) AS TABLE user_features_clustered; 在这种情况下,user_features_clustered表将包含年龄,国家、地区以及每行的簇ID的新字段。 传递给KMeans的10是簇的数量。
图3.7 逻辑回归的每次迭代运行时间(s)除了语言集成之外,使用RDD作为运算符的数据结构,这带来了另一个主要好处——执行引擎集成。 这种通用抽象允许机器学习计算和SQL查询共享worker和缓存数据,而不会产生数据移动的开销。 由于SQL查询处理是使用RDD实现的,因此为整个管道保留了lineage沿袭图,这为整个工作流程提供了端到端的容错能力。 如果在机器学习阶段发生故障,故障节点上的分区将根据其沿袭自动重新计算。
实现了两个迭代机器学习算法,逻辑回归和k均值,以比较Shark与在Hive和Hadoop中运行相同工作流的性能。 该数据集是综合生成的,包含10亿行和10列,占用100 GB的空间。 因此,特征矩阵包含10亿个点,每个点具有10个维度。 这些机器学习实验在100节点m1.xlarge EC2集群上进行。 数据最初以关系形式存储在Shark的内存存储和HDFS中。 工作流程包括三个步骤:(1)使用SQL从仓库中选择感兴趣的数据,(2)提取特征,以及(3)应用迭代算法。 在步骤3中,两个算法运行10次迭代。 图3.7和3.8分别显示了执行逻辑回归和k均值的单次迭代的时间。 我们为Hadoop实现了两个版本的算法,一个将输入数据作为文本存储在HDFS中,另一个使用序列化二进制格式。 二进制表示更紧凑,在记录反序列化时具有更低的CPU成本,从而提高了性能。 结果表明,在执行逻辑回归时,Shark比Hive和Hadoop快100倍,而执行k-means时Shark比Hive和Hadoop快30倍。 Shark对K-means的加速较logistic regression少,因为它在计算上比逻辑回归更昂贵,因此使得工作流更加受CPU限制。
图3.8 k-均值算法每次迭代运行时间(s) 在Shark中,如果数据最初驻留在其存储器中,则步骤1和2的执行时间和运行机器学习算法的一次迭代时间大致相同。 如果数据未加载到内存存储中,则两种算法的第一次迭代需要40秒。 然而,随后的迭代时间与图3.7和3.8一致。 在Hive和Hadoop中,每次迭代都会占用图中给出的时间长度,因为每次迭代都会从HDFS加载数据。本文呈现了基于RDD的复杂处理与存储优化的实现技术,并通过Shark的数据仓库系统加以说明。相似的技术,如Spark record内的批处理、索引、分区优化技术已经在GraphX、MLlib、MLI以及其它项目中使用。这些技术使得基于RDD的系统能够实现和领域内专用系统相似的性能,并且在联合处理类型的应用中表现出更高的性能,同时还可以在这些计算类型间提供容错。
