记一次spark源码的bug排查,顺便简单看一下spark sql底层引擎catalyst处理的流程

    xiaoxiao2023-10-15  174

    场景:

    hive中事先创建好分区表test_table_name,然后通过spark streaming任务处理数据,将rdd转为dataframe后写hive。

    具体出错代码

    val result = sparkSession.createDataFrame(rdd, schema) result.write.mode("append").format("hive").partitionBy("dt").saveAsTable("test_table_name")

    异常信息:

    org.apache.spark.SparkException: Requested partitioning does not match the test_table_name table: Requested partitions: Table partitions: dt at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:141) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:99) at org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand.run(CreateHiveTableAsSelectCommand.scala:66) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654) at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:458) at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:437) at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:393) //用户类UserMainClass at com.jd.union.bonus.spark.streaming.UserMainClass$$anonfun$createContext$2.apply(UserMainClass.scala:202) at com.jd.union.bonus.spark.streaming.UserMainClass$$anonfun$createContext$2.apply(UserMainClass.scala:167) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

    解决办法

    遇到这个异常,我们要学会查看异常栈,从第一行抛出的异常代码中我们可以发现

    org.apache.spark.SparkException: Requested partitioning does not match the test_table_name table: Requested partitions: Table partitions: dt

    在写入数据时候,发现找不到这个数据按照哪个分区写入,我们可以看到Requested partitions后面的值为空,但是去hive元数据里面查发现要插入的表是按照dt字段分区的,Table partitions: dt。这样造成了不一致匹配问题,从而抛出异常。

    那么问题来了,我们代码中指定了分区字段partitionBy(“dt”),那么为什么却提示为空呢?

    那么我们下面根据异常栈里翻阅源码,来寻找答案吧!

    异常栈中可以定位出问题的代码在用户类这一行:

    result.write.mode("append").format("hive").partitionBy("dt").saveAsTable("test_table_name")

    那么详细分析一下每个方法:

    //DataFrameWriter类中 //下面几个方法只是简单的给最下面几个变量赋值,在进行写数据之前我们先进行一些参数的初始化, //告诉spark引擎要写到hive,写入模式是append, 按照dt字段分区写入等 def mode(saveMode: SaveMode): DataFrameWriter[T] = { this.mode = saveMode this } def format(source: String): DataFrameWriter[T] = { this.source = source this } def partitionBy(colNames: String*): DataFrameWriter[T] = { this.partitioningColumns = Option(colNames) this } private var source: String = df.sparkSession.sessionState.conf.defaultDataSourceName private var mode: SaveMode = SaveMode.ErrorIfExists private var partitioningColumns: Option[Seq[String]] = None

    那么重要的方法来了

    def saveAsTable(tableName: String): Unit = { //将我们输入的tableName字符串 转为 TableIdentifier对象 saveAsTable(df.sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)) } private def saveAsTable(tableIdent: TableIdentifier): Unit = { val catalog = df.sparkSession.sessionState.catalog //查看该表在数据库中是否存在 val tableExists = catalog.tableExists(tableIdent) val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase) val tableIdentWithDB = tableIdent.copy(database = Some(db)) val tableName = tableIdentWithDB.unquotedString //根据 表是否存在 以及 写入模式 来确定做什么操作 (tableExists, mode) match { case (true, SaveMode.Ignore) => // Do nothing case (true, SaveMode.ErrorIfExists) => throw new AnalysisException(s"Table $tableIdent already exists.") //表已经存在,并且是Overwrite写入模式,那么执行下面的逻辑 case (true, SaveMode.Overwrite) => // Get all input data source or hive relations of the query. val srcRelations = df.logicalPlan.collect { case LogicalRelation(src: BaseRelation, _, _, _) => src case relation: HiveTableRelation => relation.tableMeta.identifier } val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed EliminateSubqueryAliases(tableRelation) match { // check if the table is a data source table (the relation is a BaseRelation). case LogicalRelation(dest: BaseRelation, _, _, _) if srcRelations.contains(dest) => throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") // check hive table relation when overwrite mode case relation: HiveTableRelation if srcRelations.contains(relation.tableMeta.identifier) => throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") case _ => // OK } // 先把表删掉 catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false) //append模式只调用了下面这一行代码,overwrite会先删除表在执行下面这一行代码 createTable(tableIdentWithDB) // 刷新catalog中缓存的表. catalog.refreshTable(tableIdentWithDB) //我们使用的是append模式 并且表已经存在,所以执行下面的逻辑,我们看createTable逻辑 case _ => createTable(tableIdent) } }

    下面我们详细看createTable逻辑

    private def createTable(tableIdent: TableIdentifier): Unit = { val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap) //根据path参数来判断,如果path参数不为空那么是external,否则是managed val tableType = if (storage.locationUri.isDefined) { CatalogTableType.EXTERNAL } else { CatalogTableType.MANAGED } //创建catalogtable对象,把partitionBy指定的字段,formate指定的数据输出源,表类型等等封装进去 val tableDesc = CatalogTable( identifier = tableIdent, tableType = tableType, storage = storage, schema = new StructType, provider = Some(source), partitionColumnNames = partitioningColumns.getOrElse(Nil), bucketSpec = getBucketSpec) runCommand(df.sparkSession, "saveAsTable")(CreateTable(tableDesc, mode, Some(df.logicalPlan))) } //封装一个DataFrameWriter操作 用来追踪QueryExecution和时间消耗,然后向用户注册的回调函数报告 private def runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit = { //创建QueryExecution,会对command即logicplan处理 val qe = session.sessionState.executePlan(command) try { val start = System.nanoTime() //调用`QueryExecution.toRDD` 去触发命令的执行 SQLExecution.withNewExecutionId(session, qe)(qe.toRdd) val end = System.nanoTime() session.listenerManager.onSuccess(name, qe, end - start) } catch { case e: Exception => session.listenerManager.onFailure(name, qe, e) throw e } } //对上面方法内部代码进一步分析,下面的代码在QueryExecution中 //创建QueryExecution,会对command即logicalPlan处理,当然这些处理是lazy的,调用execute时候触发 //command: LogicalPlan是一个未解决的逻辑计划,catalyst引擎并不认识,需要解析,转为解决的逻辑计划 lazy val analyzed: LogicalPlan = { SparkSession.setActiveSession(sparkSession) sparkSession.sessionState.analyzer.executeAndCheck(logical) } lazy val withCachedData: LogicalPlan = { assertAnalyzed() assertSupported() sparkSession.sharedState.cacheManager.useCachedData(analyzed) } //逻辑计划 变为 优化后的逻辑计划 lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData) //优化后的逻辑计划 变为 物理计划 lazy val sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, // but we will implement to choose the best plan. planner.plan(ReturnAnswer(optimizedPlan)).next() } //物理计划 变为 可执行物理计划 lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) //执行命令代码qe.toRdd lazy val toRdd: RDD[InternalRow] = executedPlan.execute() //执行 可执行的物理计划 final def execute(): RDD[InternalRow] = executeQuery { if (isCanonicalizedPlan) { throw new IllegalStateException("A canonicalized plan is not supposed to be executed.") } //执行方法 doExecute() }

    我们是往hive写数据,这里调用的DataWritingCommandExec可执行物理计划的doExecute方法

    case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan) extends SparkPlan { protected override def doExecute(): RDD[InternalRow] = { sqlContext.sparkContext.parallelize(sideEffectResult, 1) } protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { val converter = CatalystTypeConverters.createToCatalystConverter(schema) //根据异常栈可以看到调用到了下面这一步,然后继续调用出的问题 val rows = cmd.run(sqlContext.sparkSession, child) rows.map(converter(_).asInstanceOf[InternalRow]) } }

    我们知道是这一步调用出的问题

    val rows = cmd.run(sqlContext.sparkSession, child)

    但是cmd:DataWritingCommand是个接口,有很多实现类,我们怎么确定走的哪一个呢? 第一,我们写hive可以猜测,第二去异常栈里面查找,其实跟上面doExecute方法一样,可执行的逻辑计划一堆,我们咋知道用的哪个,都可以在异常栈里面到

    根据异常栈可以看到使用的是CreateHiveTableAsSelectCommand类

    org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand.run(CreateHiveTableAsSelectCommand.scala:66)

    调用其run方法

    override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val catalog = sparkSession.sessionState.catalog //查看表是否存在,如果存在走if分支,不存在走else分支 if (catalog.tableExists(tableIdentifier)) { //如果是Overwrite写入模式,那么需要drop掉表 assert(mode != SaveMode.Overwrite, s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite") if (mode == SaveMode.ErrorIfExists) { throw new AnalysisException(s"$tableIdentifier already exists.") } if (mode == SaveMode.Ignore) { // Since the table already exists and the save mode is Ignore, we will just return. return Seq.empty } //我们使用的append模式写入hive,并且表已经创建,那么创建如下的InsertIntoHiveTable对象并调用其run方法 InsertIntoHiveTable( tableDesc, Map.empty, //分区字段对应了一个空map query, overwrite = false, //写入模式不是overwriter ifPartitionNotExists = false, outputColumns = outputColumns).run(sparkSession, child) } else { //表不存在情况 //如果schema不为空那么将要抛出异常 assert(tableDesc.schema.isEmpty) //创建表 catalog.createTable(tableDesc.copy(schema = query.schema), ignoreIfExists = false) try { // Read back the metadata of the table which was created just now. val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier) // For CTAS, there is no static partition values to insert. //分区字段对应的是partitionBy指定的字段 val partition = createdTableMeta.partitionColumnNames.map(_ -> None).toMap InsertIntoHiveTable( createdTableMeta, partition, query, overwrite = true, //写入模式是overwrite ifPartitionNotExists = false, outputColumns = outputColumns).run(sparkSession, child) } catch { case NonFatal(e) => // drop the created table. catalog.dropTable(tableIdentifier, ignoreIfNotExists = true, purge = false) throw e } } Seq.empty[Row] }

    其实问题就出现了 上面代码中,我们走的是if分支,因为表存在,我们也没有drop操作,那么创建InsertIntoHiveTable对象时把分区字段搞成了一个空的map

    InsertIntoHiveTable( tableDesc, Map.empty, //分区字段对应了一个空map对应着InsertIntoHiveTable的partition变量 query, overwrite = false, //写入模式不是overwriter ifPartitionNotExists = false, outputColumns = outputColumns).run(sparkSession, child) case class InsertIntoHiveTable( table: CatalogTable, partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean, outputColumns: Seq[Attribute]) extends SaveAsHiveFile {...}

    我们看看InsertIntoHiveTable.run方法如何使用这个空map的

    override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val externalCatalog = sparkSession.sharedState.externalCatalog val hadoopConf = sparkSession.sessionState.newHadoopConf() val hiveQlTable = HiveClientImpl.toHiveTable(table) // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = new TableDesc( hiveQlTable.getInputFormatClass, // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to // substitute some output formats, e.g. substituting SequenceFileOutputFormat to // HiveSequenceFileOutputFormat. hiveQlTable.getOutputFormatClass, hiveQlTable.getMetadata ) val tableLocation = hiveQlTable.getDataLocation val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation) try { //根据异常栈我们知道这一步出的问题,重点分析这的代码 processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, child) } finally { deleteExternalTmpPath(hadoopConf) } sparkSession.catalog.uncacheTable(table.identifier.quotedString) sparkSession.sessionState.catalog.refreshTable(table.identifier) CommandUtils.updateTableStats(sparkSession, table) Seq.empty[Row] } private def processInsert( sparkSession: SparkSession, externalCatalog: ExternalCatalog, hadoopConf: Configuration, tableDesc: TableDesc, tmpLocation: Path, child: SparkPlan): Unit = { val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val numDynamicPartitions = partition.values.count(_.isEmpty) val numStaticPartitions = partition.values.count(_.nonEmpty) val partitionSpec = partition.map { case (key, Some(value)) => key -> value case (key, None) => key -> "" } // All partition column names in the format of "<column name 1>/<column name 2>/..." val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty) // By this time, the partition map must match the table's partition columns //我们发现错误日志就是这里了~!!!!!!! if (partitionColumnNames.toSet != partition.keySet) { throw new SparkException( s"""Requested partitioning does not match the ${table.identifier.table} table: |Requested partitions: ${partition.keys.mkString(",")} |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin) } ....... }

    那么很明显可以从头table.partitionColumnNames这里面去的分区字段是dt,但是partition.keys这个里面却得到了空,那么我们看partition.keys是什么东西呢?其实partition就是创建InsertIntoHiveTable时上面指定的Map.empty,很明显是空啊,所以不能匹配!!!!到此,我们找到了出问题的地方!!

    //顺便看看继承关系吧! case class InsertIntoHiveTable( table: CatalogTable, partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean, outputColumns: Seq[Attribute]) extents SaveAsHiveFile private[hive] trait SaveAsHiveFile extends DataWritingCommand

    那么如何解决呢?

    看看你有没有机会提一个PR呢?我们可以看看spark master分支针对这里的代码如何处理的,发现master分支的代码跟我使用的spark代码不一样,做了改变,没有这个空Map的代码了,那就是已经解决了,很伤心,没有机会了!

    我们先看一看社区有没有针对这个问题做的处理吧!!

    搜出来了,就在这个pr里面,以及对应的jira如下 https://issues.apache.org/jira/browse/SPARK-26307

    https://github.com/apache/spark/pull/23255/files

    好吧 ,上一波master代码吧!

    //子类CreateHiveTableAsSelectCommand调用父类CreateHiveTableAsSelectBase的run方法, // CreateHiveTableAsSelectCommand继承了CreateHiveTableAsSelectBase override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val tableExists = catalog.tableExists(tableIdentifier) if (tableExists) { assert(mode != SaveMode.Overwrite, s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite") if (mode == SaveMode.ErrorIfExists) { throw new AnalysisException(s"$tableIdentifier already exists.") } if (mode == SaveMode.Ignore) { // Since the table already exists and the save mode is Ignore, we will just return. return Seq.empty } //调用CreateHiveTableAsSelectCommand.getWritingCommand方法 val command = getWritingCommand(catalog, tableDesc, tableExists = true) command.run(sparkSession, child) } else { // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. assert(tableDesc.schema.isEmpty) catalog.createTable( tableDesc.copy(schema = outputColumns.toStructType), ignoreIfExists = false) try { // Read back the metadata of the table which was created just now. val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier) val command = getWritingCommand(catalog, createdTableMeta, tableExists = false) command.run(sparkSession, child) } catch { case NonFatal(e) => // drop the created table. catalog.dropTable(tableIdentifier, ignoreIfNotExists = true, purge = false) throw e } } Seq.empty[Row] }

    上面代码我们可以看到,无论是if还是else分支 都是调用这段代码来执行的,区别在于if分支tableExists字段是true,而else分支是false,我们走的if分支

    val command = getWritingCommand(catalog, tableDesc, tableExists = true) command.run(sparkSession, child) case class CreateHiveTableAsSelectCommand( tableDesc: CatalogTable, query: LogicalPlan, outputColumnNames: Seq[String], mode: SaveMode) extends CreateHiveTableAsSelectBase { override def getWritingCommand( catalog: SessionCatalog, tableDesc: CatalogTable, tableExists: Boolean): DataWritingCommand = { // For CTAS, there is no static partition values to insert. val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap InsertIntoHiveTable( tableDesc, partition, query, overwrite = if (tableExists) false else true, ifPartitionNotExists = false, outputColumnNames = outputColumnNames) } override def writingCommandClassName: String = Utils.getSimpleName(classOf[InsertIntoHiveTable]) }

    上面代码可以看到getWritingCommand方法中创建InsertIntoHiveTable时候,不同于我使用的分支 ,这里面不会为partition字段设置Map.empty,不管表存在与否,都是使用

    val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap

    这样不会出现InsertIntoHiveTable调用run方法,不一致问题, 但是overwrite字段没有变,表存在还是false,表不存在还是true

    command.run(sparkSession, child)

    解决办法

    修改CreateHiveTableAsSelectCommand类里面代码,将Map.empty换成右边的代码

    最新回复(0)