Hive是基于Hadoop的开源数据仓库工具,提供了类似于SQL的HiveQL语言,使得上层的数据分析人员不用知道太多MapReduce的知识就能对存储于Hdfs中的海量数据进行分析。由于这一特性而收到广泛的欢迎。
Hive的整体框架中有一个重要的模块是执行模块,这一部分是用Hadoop中MapReduce计算框架来实现,因而在处理速度上不是非常令人满意。由于Spark出色的处理速度,有人已经成功将HiveQL的执行利用Spark来运行,这就是已经非常闻名的Shark开源项目。
在Spark 1.0中,Spark自身提供了对Hive的支持。本文不准备分析Spark是如何来提供对Hive的支持的,而只着重于如何搭建Hive On Spark的测试环境。
整体的安装过程分为以下几步:
搭建Hadoop集群 (整个cluster由3台机器组成,一台作为Master,另两台作为Slave)编译Spark 1.0,使其支持Hadoop 2.4.0和Hive运行Hive on Spark的测试用例 (Spark和Hadoop Namenode运行在同一台机器)创建基于kvm的虚拟机,利用libvirt提供的图形管理界面,创建3台虚拟机,非常方便。内存和ip地址分配如下
master 2G 192.168.122.102slave1 4G 192.168.122.103slave2 4G 192.168.122.104在虚拟机上安装os的过程就略过了,我使用的是arch linux,os安装完成之后,确保以下软件也已经安装
jdkopenssh在每台机器上创建名为hadoop的用户组,添加名为hduser的用户,具体bash命令如下所示
groupadd hadoop useradd -b /home -m -g hadoop hduser passwd hduser在启动slave机器上的datanode或nodemanager的时候需要输入用户名密码,为了避免每次都要输入密码,可以利用如下指令创建无密码登录。注意是从master到slave机器的单向无密码。
cd $HOME/.ssh ssh-keygen -t dsa将id_dsa.pub复制为authorized_keys,然后上传到slave1和slave2中的$HOME/.ssh目录
cp id_dsa.pub authorized_keys #确保在slave1和slave2机器中,hduser的$HOME目录下已经创建好了.ssh目录 scp authorized_keys slave1:$HOME/.ssh scp authorized_keys slave2:$HOME/.ssh在组成集群的master, slave1和slave2中,向/etc/hosts文件添加如下内容
192.168.122.102 master 192.168.122.103 slave1 192.168.122.104 slave2如果更改完成之后,可以在master上执行ssh slave1来进行测试,如果没有输入密码的过程就直接登录入slave1就说明上述的配置成功。
以hduser身份登录master,执行如下指令
cd /home/hduser wget http://mirror.esocc.com/apache/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz mkdir yarn tar zvxf hadoop-2.4.0.tar.gz -C yarn在hadoop-config.sh文件开头处添加如下内容
export JAVA_HOME=/opt/java在yarn-env.sh开头添加如下内容
export JAVA_HOME=/opt/java export HADOOP_HOME=/home/hduser/yarn/hadoop-2.4.0 export HADOOP_MAPRED_HOME=$HADOOP_HOME export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_HDFS_HOME=$HADOOP_HOME export YARN_HOME=$HADOOP_HOME export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop文件1: $HADOOP_CONF_DIR/core-site.xml
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>fs.default.name</name> <value>hdfs://master:9000</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/home/hduser/yarn/hadoop-2.4.0/tmp</value> </property> </configuration>文件2: $HADOOP_CONF_DIR/hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>dfs.replication</name> <value>2</value> </property> <property> <name>dfs.permissions</name> <value>false</value> </property> </configuration>文件3: $HADOOP_CONF_DIR/mapred-site.xml
<?xml version="1.0"?> <configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>文件4: $HADOOP_CONF_DIR/yarn-site.xml
<?xml version="1.0"?> <configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name> <value>org.apache.hadoop.mapred.ShuffleHandler</value> </property> <property> <name>yarn.resourcemanager.resource-tracker.address</name> <value>master:8025</value> </property> <property> <name>yarn.resourcemanager.scheduler.address</name> <value>master:8030</value> </property> <property> <name>yarn.resourcemanager.address</name> <value>master:8040</value> </property> </configuration>文件5: $HADOOP_CONF_DIR/slaves
在文件中添加如下内容
slave1 slave2在$HADOOP_HOME下创建tmp目录
mkdir $HADOOP_HOME/tmp刚才所作的配置文件更改发生在master机器上,将整个更改过的内容全部复制到slave1和slave2。
for target in slave1 slave2 do scp -r yarn $target:~/ scp $HOME/.bashrc $target:~/ done批量处理是不是很爽。
在master机器上对namenode进行格式化
bin/hadoop namenode -format注意: daemon.sh表示只在本机运行,daemons.sh表示在所有的cluster节点上运行。
跑一个wordcount示例,具体步骤不再列出,可参考本系列中的第11篇
Spark的编译还是很简单的,所有失败的原因大部分可以归结于所依赖的jar包无法正常下载。
为了让Spark 1.0支持hadoop 2.4.0和hive,请使用如下指令编译
SPARK_HADOOP_VERSION=2.4.0 SPARK_YARN=true SPARK_HIVE=true sbt/sbt assembly如果一切顺利将会在assembly目录下生成 spark-assembly-1.0.0-SNAPSHOT-hadoop2.4.0.jar
编译之后整个$SPARK_HOME目录下所有的文件体积还是很大的,大概有两个多G。有哪些是运行的时候真正需要的呢,下面将会列出这些目录和文件。
$SPARK_HOME/bin$SPARK_HOME/sbin$SPARK_HOME/lib_managed$SPARK_HOME/conf$SPARK_HOME/assembly/target/scala-2.10将上述目录的内容复制到/tmp/spark-dist,然后创建压缩包
mkdir /tmp/spark-dist for i in $SPARK_HOME/{bin,sbin,lib_managed,conf,assembly/target/scala-2.10} do cp -r $i /tmp/spark-dist done cd /tmp/ tar czvf spark-1.0-dist.tar.gz spark-dist将生成的运行包上传到master(192.168.122.102)
scp spark-1.0-dist.tar.gz hduser@192.168.122.102:~/经过上述重重折磨,终于到了最为紧张的时刻了。
以hduser身份登录master机,解压spark-1.0-dist.tar.gz
#after login into the master as hduser tar zxvf spark-1.0-dist.tar.gz cd spark-dist更改conf/spark-env.sh
export SPARK_LOCAL_IP=127.0.0.1 export SPARK_MASTER_IP=127.0.0.1用bin/spark-shell指令启动shell之后,运行如下scala代码
val sc: SparkContext // An existing SparkContext. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) // Importing the SQL context gives access to all the public SQL functions and implicit conversions. import hiveContext._ hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL hql("FROM src SELECT key, value").collect().foreach(println)如果一切顺利,最后一句hql会返回key及value
在新近发布的spark 1.0中新加了sql的模块,更为引人注意的是对hive中的hiveql也提供了良好的支持,作为一个源码分析控,了解一下spark是如何完成对hql的支持是一件非常有趣的事情。
以下部分摘自Hadoop definite guide中的Hive一章
“Hive由Facebook出品,其设计之初目的是让精通SQL技能的分析师能够对Facebook存放在HDFS上的大规模数据集进行分析和查询。
Hive大大简化了对大规模数据集的分析门槛(不再要求分析人员具有很强的编程能力),迅速流行起来,成为Hadoop生成圈上的Killer Application. 目前已经有很多组织把Hive作为一个通用的,可伸缩数据处理平台。”
Hive所有的数据都存在HDFS中,在Hive中有以下几种数据模型
Tables(表) table和关系型数据库中的表是相对应的,每个表都有一个对应的hdfs目录,表中的数据经序列化后存储在该目录,Hive同时支持表中的数据存储在其它类型的文件系统中,如NFS或本地文件系统 分区(Partitions) Hive中的分区起到的作用有点类似于RDBMS中的索引功能,每个Partition都有一个对应的目录,这样在查询的时候,可以减少数据规模 桶(buckets) 即使将数据按分区之后,每个分区的规模有可能还是很大,这个时候,按照关键字的hash结果将数据分成多个buckets,每个bucket对应于一个文件HiveQL是Hive支持的类似于SQL的查询语言。HiveQL大体可以分成下面两种类型
DDL(data definition language) 比如创建数据库(create database),创建表(create table),数据库和表的删除 DML(data manipulation language) 数据的添加,查询 UDF(user defined function) Hive还支持用户自定义查询函数hive的整体框架图如下图所示
由上图可以看出,Hive的整体架构可以分成以下几大部分:
用户接口 支持CLI, JDBC和Web UIDriver Driver负责将用户指令翻译转换成为相应的MapReduce JobMetaStore 元数据存储仓库,像数据库和表的定义这些内容就属于元数据这个范畴,默认使用的是Derby存储引擎HiveQL的执行过程如下所述
parser 将HiveQL解析为相应的语法树Semantic Analyser 语义分析Logical Plan Generating 生成相应的LogicalPlanQuery Plan GeneratingOptimizer最终生成MapReduce的Job,交付给Hadoop的MapReduce计算框架具体运行。
最好的学习就是实战,Hive这一小节还是以一个具体的例子来结束吧。
前提条件是已经安装好hadoop,具体安装可以参考源码走读11或走读9
warehouse用来存储raw data
$ $HADOOP_HOME/bin/hadoop fs -mkdir /tmp $ $HADOOP_HOME/bin/hadoop fs -mkdir /user/hive/warehouse $ $HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp $ $HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse创建表,首先将schema数据写入到metastore,另一件事情就是在warehouse目录下创建相应的子目录,该子目录以表的名称命名
CREATE TABLE u_data ( userid INT, movieid INT, rating INT, unixtime STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE;导入的数据会存储在step 3中创建的表目录下
LOAD DATA LOCAL INPATH '/u.data' OVERWRITE INTO TABLE u_data;Q: 上一章节花了大量的篇幅介绍了hive由来,框架及hiveql执行过程。那这些东西跟我们标题中所称的hive on spark有什么关系呢?
Ans: Hive的整体解决方案很不错,但有一些地方还值得改进,其中之一就是“从查询提交到结果返回需要相当长的时间,查询耗时太长”。之所以查询时间很长,一个主要的原因就是因为Hive原生是基于MapReduce的,哪有没有办法提高呢。您一定想到了,“不是生成MapReduce Job,而是生成Spark Job”, 充分利用Spark的快速执行能力来缩短HiveQl的响应时间。
下图是Spark 1.0中所支持的lib库,SQL是其唯一新添加的lib库,可见SQL在Spark 1.0中的地位之重要。
HiveContext是Spark提供的用户接口,HiveContext继承自SqlContext。
让我们回顾一下,SqlContext中牵涉到的类及其间的关系如下图所示,具体分析过程参见本系列中的源码走读之11。
既然是继承自SqlContext,那么我们将普通sql与hiveql分析执行步骤做一个对比,可以得到下图。
有了上述的比较,就能抓住源码分析时需要把握的几个关键点:
Entrypoint HiveContext.scalaQueryExecution HiveContext.scala parser HiveQl.scala optimizer使用到的数据有两种
Schema Data 像数据库的定义和表的结构,这些都存储在MetaStore中Raw data 即要分析的文件本身hiveql是整个的入口点,而hql是hiveql的缩写形式。
def hiveql(hqlQuery: String): SchemaRDD = { val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery)) // We force query optimization to happen right away instead of letting it happen lazily like // when using the query DSL. This is so DDL commands behave as expected. This is only // generates the RDD lineage for DML queries, but does not perform any execution. result.queryExecution.toRdd result }上述hiveql的定义与sql的定义几乎一模一样,唯一的不同是sql中使用parseSql的结果作为SchemaRDD的入参而hiveql中使用HiveQl.parseSql作为SchemaRdd的入参
parseSql的函数定义如代码所示,解析过程中将指令分成两大类
nativecommand 非select语句,这类语句的特点是执行时间不会因为条件的不同而有很大的差异,基本上都能在较短的时间内完成非nativecommand 主要是select语句 def parseSql(sql: String): LogicalPlan = { try { if (sql.toLowerCase.startsWith("set")) { NativeCommand(sql) } else if (sql.toLowerCase.startsWith("add jar")) { AddJar(sql.drop(8)) } else if (sql.toLowerCase.startsWith("add file")) { AddFile(sql.drop(9)) } else if (sql.startsWith("dfs")) { DfsCommand(sql) } else if (sql.startsWith("source")) { SourceCommand(sql.split(" ").toSeq match { case Seq("source", filePath) => filePath }) } else if (sql.startsWith("!")) { ShellCommand(sql.drop(1)) } else { val tree = getAst(sql) if (nativeCommands contains tree.getText) { NativeCommand(sql) } else { nodeToPlan(tree) match { case NativePlaceholder => NativeCommand(sql) case other => other } } } } catch { case e: Exception => throw new ParseException(sql, e) case e: NotImplementedError => sys.error( s""" |Unsupported language features in query: $sql |${dumpTree(getAst(sql))} """.stripMargin) } }哪些指令是nativecommand呢,答案在HiveQl.scala中的nativeCommands变量,列表很长,代码就不一一列出。
对于非nativeCommand,最重要的解析函数就是nodeToPlan
Spark对HiveQL所做的优化主要体现在Query相关的操作,其它的依然使用Hive的原生执行引擎。
在logicalPlan到physicalPlan的转换过程中,toRdd最关键的元素
override lazy val toRdd: RDD[Row] = analyzed match { case NativeCommand(cmd) => val output = runSqlHive(cmd) if (output.size == 0) { emptyResult } else { val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]])) sparkContext.parallelize(asRows, 1) } case _ => executedPlan.execute().map(_.copy()) }由于native command是一些非耗时的操作,直接使用Hive中原有的exeucte engine来执行即可。这些command的执行示意图如下
HiveTypeCoercion
val typeCoercionRules = List(PropagateTypes, ConvertNaNs, WidenTypes, PromoteStrings, BooleanComparisons, BooleanCasts, StringToIntegralCasts, FunctionArgumentConversion)PreInsertionCasts存在的目的就是确保在数据插入执行之前,相应的表已经存在。
override lazy val optimizedPlan = optimizer(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))此处要注意的是catalog的用途,catalog是HiveMetastoreCatalog的实例。
HiveMetastoreCatalog是Spark中对Hive Metastore访问的wrapper。HiveMetastoreCatalog通过调用相应的Hive Api可以获得数据库中的表及表的分区,也可以创建新的表和分区。
HiveMetastoreCatalog中会通过hive client来访问metastore中的元数据,使用了大量的Hive Api。其中包括了广为人知的deSer library。
以CreateTable函数为例说明对Hive Library的依赖。
def createTable( databaseName: String, tableName: String, schema: Seq[Attribute], allowExisting: Boolean = false): Unit = { val table = new Table(databaseName, tableName) val hiveSchema = schema.map(attr => new FieldSchema(attr.name, toMetastoreType(attr.dataType), "")) table.setFields(hiveSchema) val sd = new StorageDescriptor() table.getTTable.setSd(sd) sd.setCols(hiveSchema) // TODO: THESE ARE ALL DEFAULTS, WE NEED TO PARSE / UNDERSTAND the output specs. sd.setCompressed(false) sd.setParameters(Map[String, String]()) sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat") sd.setOutputFormat("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat") val serDeInfo = new SerDeInfo() serDeInfo.setName(tableName) serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") serDeInfo.setParameters(Map[String, String]()) sd.setSerdeInfo(serDeInfo) try client.createTable(table) catch { case e: org.apache.hadoop.hive.ql.metadata.HiveException if e.getCause.isInstanceOf[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] && allowExisting => // Do nothing. } }结合源码,我们再对一个简单的例子作下说明。
可能你会想,既然spark也支持hql,那么我原先用hive cli创建的数据库和表用spark能不能访问到呢?答案或许会让你很纳闷,“在默认的配置下是不行的”。为什么?
Hive中的meta data采用的存储引擎是Derby,该存储引擎只能有一个访问用户。同一时刻只能有一个人访问,即便以同一用户登录访问也不行。针对这个局限,解决方法就是将metastore存储在mysql或者其它可以多用户访问的数据库中。
具体实例
创建表导入数据查询删除表在启动spark-shell之前,需要先设置环境变量HIVE_HOME和HADOOP_HOME.
启动spark-shell之后,执行如下代码
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) // Importing the SQL context gives access to all the public SQL functions and implicit conversions. import hiveContext._ hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL hql("FROM src SELECT key, value").collect().foreach(println) hql("drop table src")create操作会在/user/hive/warehouse/目录下创建src目录,可以用以下指令来验证
$$HADOOP_HOME/bin/hdfs dfs -ls /user/hive/warehouse/drop表的时候,不仅metastore中相应的记录被删除,而且原始数据raw file本身也会被删除,即在warehouse目录下对应某个表的目录会被整体删除掉。
上述的create, load及query操作对metastore和raw data的影响可以用下图的表示:
如果想对hive默认的配置作修改,可以使用hive-site.xml。
具体步骤如下
- 在$SPARK_HOME/conf目录下创建hive-site.xml
- 根据需要,添写相应的配置项的值,可以这样做,将$HIVE_HOME/conf目录下的hive-default.xml复制到$SPARK_HOME/conf,然后重命名为hive-site.xml
为了进一步提升sql的执行速度,在Spark开发团队在发布完1.0之后,会通过codegen的方法来提升执行速度。codegen有点类似于jvm中的jit技术。充分利用了scala语言的特性。
Spark目前还缺乏一个非常有影响力的应用,也就通常所说的killer application。SQL是Spark在寻找killer application方面所做的一个积极尝试,也是目前Spark上最有热度的一个话题,但通过优化Hive执行速度来吸引潜在Spark用户,该突破方向选择正确与否还有待市场证明。
Hive除了在执行速度上为人诟病之外,还有一个最大的问题就是多用户访问的问题,相较第一个问题,第二个问题来得更为致命。无论是Facebook在Hive之后推出的Presto还是Cloudera推出的Impala都是针对第二问题提出的解决方案,目前都已经取得的了巨大优势。
本文就Spark对HiveQL提供支持的这一功能进行了比较详细的分析,其中涉及到以下几个问题。
什么是hivehive有什么缺点,否则就没Spark或Shark啥事了Spark主要是针对hive的哪个不足做出改进Spark是如何对这个做改进的