《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(一)

    xiaoxiao2024-09-29  97

    Spark SQL, DataFrames 以及 Datasets 编程指南

    概要

    Spark SQL是Spark中处理结构化数据的模块。与基础的Spark RDD API不同,Spark SQL的接口提供了更多关于数据的结构信息和计算任务的运行时信息。在Spark内部,Spark SQL会能够用于做优化的信息比RDD API更多一些。Spark SQL如今有了三种不同的API:SQL语句、DataFrame API和最新的Dataset API。不过真正运行计算的时候,无论你使用哪种API或语言,Spark SQL使用的执行引擎都是同一个。这种底层的统一,使开发者可以在不同的API之间来回切换,你可以选择一种最自然的方式,来表达你的需求。

     

    本文中所有的示例都使用Spark发布版本中自带的示例数据,并且可以在spark-shell、pyspark shell以及sparkR shell中运行。

    SQL

    Spark SQL的一种用法是直接执行SQL查询语句,你可使用最基本的SQL语法,也可以选择HiveQL语法。Spark SQL可以从已有的Hive中读取数据。更详细的请参考Hive Tables 这一节。如果用其他编程语言运行SQL,Spark SQL将以DataFrame返回结果。你还可以通过命令行command-line 或者 JDBC/ODBC 使用Spark SQL。

    DataFrames

    DataFrame是一种分布式数据集合,每一条数据都由几个命名字段组成。概念上来说,她和关系型数据库的表 或者 R和Python中的data frame等价,只不过在底层,DataFrame采用了更多优化。DataFrame可以从很多数据源(sources)加载数据并构造得到,如:结构化数据文件,Hive中的表,外部数据库,或者已有的RDD。

    DataFrame API支持Scala, Java, Python, and R。

    Datasets

    Dataset是Spark-1.6新增的一种API,目前还是实验性的。Dataset想要把RDD的优势(强类型,可以使用lambda表达式函数)和Spark SQL的优化执行引擎的优势结合到一起。Dataset可以由JVM对象构建(constructed )得到,而后Dataset上可以使用各种transformation算子(map,flatMap,filter 等)。

    Dataset API 对 Scala 和 Java的支持接口是一致的,但目前还不支持Python,不过Python自身就有语言动态特性优势(例如,你可以使用字段名来访问数据,row.columnName)。对Python的完整支持在未来的版本会增加进来。

    入门

    入口:SQLContext

    ScalaJavaPythonR

    Spark SQL所有的功能入口都是SQLContext 类,及其子类。不过要创建一个SQLContext对象,首先需要有一个SparkContext对象。

    val sc: SparkContext // 假设已经有一个 SparkContext 对象 val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 用于包含RDD到DataFrame隐式转换操作 import sqlContext.implicits._

    除了SQLContext之外,你也可以创建HiveContext,HiveContext是SQLContext 的超集。

    除了SQLContext的功能之外,HiveContext还提供了完整的HiveQL语法,UDF使用,以及对Hive表中数据的访问。要使用HiveContext,你并不需要安装Hive,而且SQLContext能用的数据源,HiveContext也一样能用。HiveContext是单独打包的,从而避免了在默认的Spark发布版本中包含所有的Hive依赖。如果这些依赖对你来说不是问题(不会造成依赖冲突等),建议你在Spark-1.3之前使用HiveContext。而后续的Spark版本,将会逐渐把SQLContext升级到和HiveContext功能差不多的状态。

    spark.sql.dialect选项可以指定不同的SQL变种(或者叫SQL方言)。这个参数可以在SparkContext.setConf里指定,也可以通过 SQL语句的SET key=value命令指定。对于SQLContext,该配置目前唯一的可选值就是”sql”,这个变种使用一个Spark SQL自带的简易SQL解析器。而对于HiveContext,spark.sql.dialect 默认值为”hiveql”,当然你也可以将其值设回”sql”。仅就目前而言,HiveSQL解析器支持更加完整的SQL语法,所以大部分情况下,推荐使用HiveContext。

    创建DataFrame

    Spark应用可以用SparkContext创建DataFrame,所需的数据来源可以是已有的RDD(existing RDD),或者Hive表,或者其他数据源(data sources.)

    以下是一个从JSON文件创建DataFrame的小栗子:

    ScalaJavaPythonR val sc: SparkContext // 已有的 SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.json("examples/src/main/resources/people.json") // 将DataFrame内容打印到stdout df.show()

    DataFrame操作

    DataFrame提供了结构化数据的领域专用语言支持,包括Scala, Java, Python and R.

    这里我们给出一个结构化数据处理的基本示例:

    ScalaJavaPythonR val sc: SparkContext // 已有的 SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 创建一个 DataFrame val df = sqlContext.read.json("examples/src/main/resources/people.json") // 展示 DataFrame 的内容 df.show() // age name // null Michael // 30 Andy // 19 Justin // 打印数据树形结构 df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // select "name" 字段 df.select("name").show() // name // Michael // Andy // Justin // 展示所有人,但所有人的 age 都加1 df.select(df("name"), df("age") + 1).show() // name (age + 1) // Michael null // Andy 31 // Justin 20 // 筛选出年龄大于21的人 df.filter(df("age") > 21).show() // age name // 30 Andy // 计算各个年龄的人数 df.groupBy("age").count().show() // age count // null 1 // 19 1 // 30 1

    DataFrame的完整API列表请参考这里:API Documentation

    除了简单的字段引用和表达式支持之外,DataFrame还提供了丰富的工具函数库,包括字符串组装,日期处理,常见的数学函数等。完整列表见这里:DataFrame Function Reference.

    编程方式执行SQL查询

    SQLContext.sql可以执行一个SQL查询,并返回DataFrame结果。

    ScalaJavaPythonR val sqlContext = ... // 已有一个 SQLContext 对象 val df = sqlContext.sql("SELECT * FROM table")

    创建Dataset

    Dataset API和RDD类似,不过Dataset不使用Java序列化或者Kryo,而是使用专用的编码器(Encoder )来序列化对象和跨网络传输通信。如果这个编码器和标准序列化都能把对象转字节,那么编码器就可以根据代码动态生成,并使用一种特殊数据格式,这种格式下的对象不需要反序列化回来,就能允许Spark进行操作,如过滤、排序、哈希等。

    ScalaJava // 对普通类型数据的Encoder是由 importing sqlContext.implicits._ 自动提供的 val ds = Seq(1, 2, 3).toDS() ds.map(_ + 1).collect() // 返回: Array(2, 3, 4) // 以下这行不仅定义了case class,同时也自动为其创建了Encoder case class Person(name: String, age: Long) val ds = Seq(Person("Andy", 32)).toDS() // DataFrame 只需提供一个和数据schema对应的class即可转换为 Dataset。Spark会根据字段名进行映射。 val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path).as[Person]

    和RDD互操作

    Spark SQL有两种方法将RDD转为DataFrame。

    1. 使用反射机制,推导包含指定类型对象RDD的schema。这种基于反射机制的方法使代码更简洁,而且如果你事先知道数据schema,推荐使用这种方式;

    2. 编程方式构建一个schema,然后应用到指定RDD上。这种方式更啰嗦,但如果你事先不知道数据有哪些字段,或者数据schema是运行时读取进来的,那么你很可能需要用这种方式。

    利用反射推导schema

    ScalaJavaPython

    Spark SQL的Scala接口支持自动将包含case class对象的RDD转为DataFrame。对应的case class定义了表的schema。case class的参数名通过反射,映射为表的字段名。case class还可以嵌套一些复杂类型,如Seq和Array。RDD隐式转换成DataFrame后,可以进一步注册成表。随后,你就可以对表中数据使用SQL语句查询了。

    // sc 是已有的 SparkContext 对象 val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 为了支持RDD到DataFrame的隐式转换 import sqlContext.implicits._ // 定义一个case class. // 注意:Scala 2.10的case class最多支持22个字段,要绕过这一限制, // 你可以使用自定义class,并实现Product接口。当然,你也可以改用编程方式定义schema case class Person(name: String, age: Int) // 创建一个包含Person对象的RDD,并将其注册成table val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") // sqlContext.sql方法可以直接执行SQL语句 val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // SQL查询的返回结果是一个DataFrame,且能够支持所有常见的RDD算子 // 查询结果中每行的字段可以按字段索引访问: teenagers.map(t => "Name: " + t(0)).collect().foreach(println) // 或者按字段名访问: teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println) // row.getValuesMap[T] 会一次性返回多列,并以Map[String, T]为返回结果类型 teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println) // 返回结果: Map("name" -> "Justin", "age" -> 19)

    转载自  并发编程网 - ifeve.com 相关资源:敏捷开发V1.0.pptx
    最新回复(0)