大多数系统中,采集的原始日志数据往往都是半结构化或者非结构化的。而对于数据分析人员来说,每次写代码去处理原始数据不够高效且有一定的难度,二维表结构的数据使用起来才是最自然方便的。因此,数据处理的第一步往往是将原始数据展开成一张底层大宽表以供上层的数据应用或者数据开发人员使用。但对于原始日志数据中字段往往很多,而且多是嵌套结构的,对于底层大宽表应该包含哪些字段,以及对于嵌套结构如何处理,这是一个令人头疼的问题。
那能直接使用SQL方法分析处理原始数据吗?
一方面,Spark计算框架可以支持大数据量的计算,而且其支持SQL接口;另一方面,Protobuf是一种常用的数据接口协议。本文将protobuf协议为例介绍使用Spark SQL分析处理原始数据的方案。
对于上层报表类应用来说,我们可以基于报表需求选取字段形成大宽表。但对于分析型应用,特别对于一些问题诊断分析场景来说,往往需要一些不常使用的字段,且往往需要最原始未经加工的数据。
对于其它嵌套结构数据,如json,只需要将RDD转换DataFrame步骤进行替换即可达到同样的目的。
原始数据是protobuf协议的数据,如果能将protobuf格式的日志数据转换成DataFrame,那就可以使用Spark SQL来进行数据分析计算了。
假设protobuf协议如下:
package test; enum Sex { Female = 1; Male = 2; } message Job { required string company = 1; repeated string industries = 2; } message Person { required string name = 1; required Sex sex = 2; required int32 age = 3; repeated string interests = 4; repeated Job jobs = 5; } message Log { repeated Person person = 1; }假设日志数据已经采集好并存储在hdfs或其他存储系统中,那么首先可通过相应读数据方法加载数据并转换为RDD[Protobuf]格式。下面以本地的测试数据person.txt为例进行介绍:
person { name: "Li" sex: Male age: 27 interests: "badminton" interests: "running" interests: "reading" jobs { company: "Google" industries: "Internet" } } person { name: "Zhang" sex: Male age: 26 interests: "basketball" interests: "running" interests: "reading" jobs { company: "Tencent" industries: "Internet" industries: "Game" } } person { name: "Wang" sex: Female age: 26 interests: "dancing" interests: "swim" interests: "reading" jobs { company: "Alibaba" industries: "Internet" industries: "E-commerce" } } person { name: "Liu" sex: Female age: 23 interests: "song" interests: "swim" interests: "game" jobs { company: "Tencent" industries: "Internet" industries: "Game" } }读取本地测试数据,并将其转换为RDD[Person]:
val log = { val in = getClass.getResourceAsStream("/person.txt") val builder = Log.newBuilder() TextFormat.merge(new BufferedReader(new InputStreamReader(in)), builder) builder.build() } val personRdd = sc.parallelize(log.getPersonList)如果是RDD[JavaBean]格式的数据,可以利用SparkSession中createDataFrame方法将其转换为DataFrame。那对于Protobuf格式的数据也可通过自动解析Protobuf协议的方式进行转换,可参考saurfang的sparksql-protobuf包(也可以参考其源码编写自己的转换方式)将RDD[Protobuf]转换为DataFrame。其依赖maven包如下:
<!-- https://mvnrepository.com/artifact/com.github.saurfang/sparksql-protobuf --> <dependency> <groupId>com.github.saurfang</groupId> <artifactId>sparksql-protobuf_2.11</artifactId> <version>0.1.3</version> </dependency>将RDD[Person]转换为DataFrame的schema如下:
import com.github.saurfang.parquet.proto.spark.sql._ val personsDF = sparkSession.sqlContext.createDataFrame(personRdd) personsDF.printSchema() root |-- name: string (nullable = false) |-- sex: string (nullable = false) |-- age: integer (nullable = false) |-- interests: array (nullable = false) | |-- element: string (containsNull = false) |-- jobs: array (nullable = false) | |-- element: struct (containsNull = false) | | |-- company: string (nullable = false) | | |-- industries: array (nullable = false) | | | |-- element: string (containsNull = false)转换为DataFrame结构的数据后,就可以使用Spark SQL对其进行分析处理。比如可统计不同性别下的人数:
personsDF.createOrReplaceTempView("person") val result = sparkSession.sql("select sex, count(*) as cnt from person group by sex") result.show() +------+---+ | sex|cnt| +------+---+ |Female| 2| | Male| 2| +------+---+在将RDD[Protobuf]转换为DataFrame时,数据占用的内存可能会膨胀很多,这时候可以考虑在转换之前增加RDD的分区数,减少每个分区中的数据量。但是如果协议字段很多,而查询分析只需要其中某几个字段,如果能在转换DataFrame过程中就将不需要的字段去除,将极大地提高计算效率与优化内存使用。
参考saurfang的sparksql-protobuf源码,我们实现是通过createDataFrame方法进行转换的:
def createDataFrame[A <: AbstractMessage : TypeTag](rdd: RDD[A]): DataFrame = { val schema = ProtoReflection.schemaFor[A].dataType.asInstanceOf[StructType] val rowRDD = rdd.map(ProtoRDDConversions.messageToRow) sqlContext.createDataFrame(rowRDD, schema) }从上面源码可知,只需要对ProtoReflection.schemaFor和ProtoRDDConversions.messageToRow方法进行改造即可实现在转换成DataFrame前将多余字段裁剪掉。
ProtoRDDConversions类的messageToRow方法的改造:
def messageToRow[A <: AbstractMessage](message: A, selectCols: Seq[String], prefix: String = null, separator: String = "_"): Row = { import collection.JavaConversions._ def toRowData(fd: FieldDescriptor, fieldPrefix: String, obj: AnyRef) = { fd.getJavaType match { case BYTE_STRING => obj.asInstanceOf[ByteString].toByteArray case ENUM => obj.asInstanceOf[EnumValueDescriptor].getName case MESSAGE => messageToRow(obj.asInstanceOf[AbstractMessage], selectCols, fieldPrefix, separator) case _ => obj } } val fieldDescriptors = message.getDescriptorForType.getFields val fields = message.getAllFields Row( fieldDescriptors.flatMap { fd => { val flatName = if (prefix == null) fd.getName else s"$prefix$separator${fd.getName}" if (fields.containsKey(fd) && selectCols.exists( _.startsWith(flatName))) { val obj = fields.get(fd) if (fd.isRepeated) { List( obj .asInstanceOf[java.util.List[Object]] .map(toRowData(fd, flatName, _))) } else { List(toRowData(fd, flatName, obj)) } } else if (selectCols.exists(_.startsWith(flatName))) { if (fd.isRepeated) { List(Seq()) } else List(null) } else { None } } }: _* ) }ProtoReflection的schemaFor方法的改造:
/** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */ def schemaFor[T <: AbstractMessage](clazz: Class[T], selectCols: Seq[String], separator: String = "_"): Schema = { ScalaReflectionLock.synchronized { val descriptor = clazz.getMethod("getDescriptor").invoke(null).asInstanceOf[Descriptor] import collection.JavaConversions._ Schema(StructType( descriptor.getFields.flatMap( structFieldFor(_, selectCols, separator))), nullable = true) } } private def structFieldFor(fd: FieldDescriptor, selectCols: Seq[String], separator: String = "_", prefix: String = null): Option[StructField] = { val flatName = if (prefix == null) fd.getName else s"$prefix$separator${fd.getName}" if (!selectCols.exists(_.startsWith(flatName))) { return None } import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._ val dataType = fd.getJavaType match { case INT => Some(IntegerType) case LONG => Some(LongType) case FLOAT => Some(FloatType) case DOUBLE => Some(DoubleType) case BOOLEAN => Some(BooleanType) case STRING => Some(StringType) case BYTE_STRING => Some(BinaryType) case ENUM => Some(StringType) case MESSAGE => import collection.JavaConversions._ Option( fd.getMessageType.getFields.flatMap( structFieldFor(_, selectCols, separator, flatName))) .filter(_.nonEmpty) .map(StructType.apply) } dataType.map( dt => StructField( fd.getName, if (fd.isRepeated) ArrayType(dt, containsNull = false) else dt, nullable = !fd.isRequired && !fd.isRepeated )) }由于协议数据往往是嵌套结构,虽然现在已经得到原始日志对应的表结构数据,但使用SQL处理嵌套结构数据是比较麻烦的(SQL处理最方便的是二维表结构数据)。那有没有方式将嵌套结构数据自动转换为二维表结构数据呢?
最常规的方法是使用lateral view和explode来将数据进行展平处理。比如需要统计每个兴趣爱好对应的人数:
personsDF.createOrReplaceTempView("person") val result = sparkSession.sql( "select interest, count(name) as cnt from person lateral view explode(interests) t as interest group by interest") result.show() +----------+---+ | interest|cnt| +----------+---+ | reading| 3| |basketball| 1| | dancing| 1| | badminton| 1| | song| 1| | swim| 2| | running| 2| | game| 1| +----------+---+常规方法是可行的,但此方法的SQL往往非常复杂,非常难以理解。有没有方法可以实现嵌套数据自动展平?
如果能将嵌套结构的数据自动展平,这样无论什么时候对于用户来说看到的都是一张二维表结构数据,理解起来更加简单自然。参考了这里的代码,并对其进行小的改进,即可实现自动展平。
注意: 这里必须根据需要的列进行展平,如果不管实际的需要默认就将所有嵌套数据展平,可能会导致数量的重复,以及数据行数的激增。比如我不想分析jobs中的industries,但却将industries展平成多行,则会导致统计数变多,统计数据错误。
这里分隔符不能使用".",因为Spark SQL在解析你的查询sql时会无法解析。比如"jobs.company",那你的列就是"jobs.company"还是说你是想引用"jobs"结构下的"company"列,这会造成歧义。
def flatten(df: DataFrame, selectCols: Seq[String], separator: String = "_"): DataFrame = { def mustFlatten(structType: StructType): Boolean = structType.fields.exists( f => f.dataType.isInstanceOf[ArrayType] || f.dataType .isInstanceOf[StructType]) def flattenAndExplodeOne(structType: StructType, parent: Column = null, prefix: String = null, cols: Array[(DataType, Column)] = Array[(DataType, Column)]()) : Array[(DataType, Column)] = { val res = structType.fields.foldLeft(cols)((columns, f) => { val myCol = if (parent == null) col(f.name) else parent.getItem(f.name) val flatName = if (prefix == null) f.name else s"$prefix$separator${f.name}" if (!selectCols.exists(_.startsWith(flatName))) { columns } else { f.dataType match { case st: StructType => flattenAndExplodeOne(st, myCol, flatName, columns) case dt: ArrayType => { if (columns.exists(_._1.isInstanceOf[ArrayType])) { columns :+ ((dt, myCol.as(flatName))) } else { columns :+ ((dt, explode(myCol).as(flatName))) } } case dt => columns :+ ((dt, myCol.as(flatName))) } } }) res } var flatDf = df while (mustFlatten(flatDf.schema)) { val newColumns = flattenAndExplodeOne(flatDf.schema, null, null).map(_._2) flatDf = flatDf.select(newColumns: _*) } flatDf }数据展平后,我们就可以像对待普通二维表一样对其进行分析查询。比如需要统计每个行业对应的人数:
val flattenDF = flatten(personsDF, Seq("name", "sex", "jobs_company", "jobs_industries")) flattenDF.printSchema() flattenDF.createOrReplaceTempView("person") val result = sparkSession.sql( "select jobs_industries, count(*) as cnt from person group by jobs_industries") result.show() root |-- name: string (nullable = false) |-- sex: string (nullable = false) |-- jobs_company: string (nullable = false) |-- jobs_industries: string (nullable = false) +---------------+---+ |jobs_industries|cnt| +---------------+---+ | Internet| 4| | E-commerce| 1| | Game| 2| +---------------+---+为了能自动根据需要来展平嵌套结构数据,需要指定要用到的数据列。我们可以使用Spark框架中的org.apache.spark.sql.catalyst.parser包来自动解析查询SQL。
根据SQL的结构,使用到的列主要来自三个部分:select,aggregate和filter,将这三部分用到的列都拿到即可得到SQL中使用到的所有列。
这样就可以根据查询的SQL自动的进行数据展平,让用户像查询二维表一样查询分析嵌套结构日志数据。比如需要统计每个行业对应的人数:
val sql = "select jobs_industries, count(name) as cnt from person group by jobs_industries" val cols = (getSelectedColumns(sparkSession, sql) ++ getFilterColumns(sparkSession, sql)).distinct val flattenDF = flatten(personsDF, cols) flattenDF.printSchema() flattenDF.createOrReplaceTempView("person") val result = sparkSession.sql(sql) result.show() root |-- name: string (nullable = false) |-- jobs_industries: string (nullable = false) +---------------+---+ |jobs_industries|cnt| +---------------+---+ | Internet| 4| | E-commerce| 1| | Game| 2| +---------------+---+为了能自动加载数据,我们需要根据查询SQL自动加载用户需要的数据。同样我们可以通过解析SQL中用到的数据源表名,并根据预先定义好的表名到数据源的映射,自动获取要分析的数据并进行加载。
def getTables(sparkSession: SparkSession, sqlToParse: String): Seq[String] = { sparkSession.sessionState.sqlParser .parsePlan(sqlToParse) .collect { case r: UnresolvedRelation => r.tableName } .distinct }获取到数据源后,还需要自动加载对应时间的数据(总不能每次都加载全量数据)。简单点可在sql语法中加入特殊的数据时间定义格式,从而获取数据时间。比如select * from tableName partition(start:end),通过强制sql的格式来获取数据时间。但同样也可以从sql的filter中自动解析数据时间:
def getTimeConditionRange(sparkSession: SparkSession, sqlToParse: String, timeColumn: String): Option[(Long, Long)] = { val conditions = sparkSession.sessionState.sqlParser .parsePlan(sqlToParse) .collect { case r: Filter => r.condition } val timeRanges = conditions .filter(_.toString().contains(timeColumn)) .flatMap(_.collect { case r: BinaryComparison => getTimeComparisonRange(r, timeColumn) }) val startTime = timeRanges.map(_._1).max val endTime = timeRanges.map(_._2).min if (startTime == Long.MinValue || endTime == Long.MaxValue || startTime > endTime) { None } else { Some(startTime, endTime) } } private def isUnresolvedAttribute(expression: Expression): Boolean = { expression.isInstanceOf[UnresolvedAttribute] } private def isLeafUnresolvedAttribute(expression: Expression): Boolean = { isUnresolvedAttribute(expression) && expression .asInstanceOf[UnresolvedAttribute] .nameParts .size == 1 } private def getTimeComparisonRange(comparison: BinaryComparison, timeColumn: String): (Long, Long) = { try { val defaultRange = (Long.MinValue, Long.MaxValue) def equalTimeColumn(expression: Expression): Boolean = { expression.isInstanceOf[UnresolvedAttribute] && expression .asInstanceOf[UnresolvedAttribute] .name .equalsIgnoreCase(timeColumn) } comparison match { case r: EqualTo => if (equalTimeColumn(r.left) && r.right.isInstanceOf[Literal]) { val value = r.right.asInstanceOf[Literal].value.toString.toLong (value, value) } else if (equalTimeColumn(r.right) && r.left.isInstanceOf[Literal]) { val value = r.left.asInstanceOf[Literal].value.toString.toLong (value, value) } else { defaultRange } case r: LessThan => if (equalTimeColumn(r.left) && r.right.isInstanceOf[Literal]) { val value = r.right.asInstanceOf[Literal].value.toString.toLong (Long.MinValue, value - 1) } else if (equalTimeColumn(r.right) && r.left.isInstanceOf[Literal]) { val value = r.left.asInstanceOf[Literal].value.toString.toLong (Long.MinValue, value - 1) } else { defaultRange } case r: LessThanOrEqual => if (equalTimeColumn(r.left) && r.right.isInstanceOf[Literal]) { val value = r.right.asInstanceOf[Literal].value.toString.toLong (Long.MinValue, value) } else if (equalTimeColumn(r.right) && r.left.isInstanceOf[Literal]) { val value = r.left.asInstanceOf[Literal].value.toString.toLong (Long.MinValue, value) } else { defaultRange } case r: GreaterThan => if (equalTimeColumn(r.left) && r.right.isInstanceOf[Literal]) { val value = r.right.asInstanceOf[Literal].value.toString.toLong (value + 1, Long.MaxValue) } else if (equalTimeColumn(r.right) && r.left.isInstanceOf[Literal]) { val value = r.left.asInstanceOf[Literal].value.toString.toLong (value + 1, Long.MaxValue) } else { defaultRange } case r: GreaterThanOrEqual => if (equalTimeColumn(r.left) && r.right.isInstanceOf[Literal]) { val value = r.right.asInstanceOf[Literal].value.toString.toLong (value, Long.MaxValue) } else if (equalTimeColumn(r.right) && r.left.isInstanceOf[Literal]) { val value = r.left.asInstanceOf[Literal].value.toString.toLong (value, Long.MaxValue) } else { defaultRange } case _ => defaultRange } } catch { case e: NumberFormatException => { val msg = s"The value for $timeColumn must be number: \n${ExceptionUtils.getStackTrace(e)}" log.error(msg) throw e } } }本文介绍了使用Spark SQL分析处理嵌套结构数据的方案。首先利用Spark框架的org.apache.spark.sql.catalyst.parser包自动解析查询SQL,从而得到数据源、数据时间范围和使用列。然后根据解析的数据源和数据时间加载原始数据,并转换为RDD[Protobuf]。接着使用saurfang的sparksql-protobuf包将RDD[Protobuf]转换为DataFrame结构化数据。最后根据解析的查询SQL使用列来自动将嵌套数据展平为普通二维表,从而达到像使用普通二维表一样用SQL语法来灵活地基于原始数据进行分析处理的目的。