一般来说,在我们将数据读到DataFrame之后,会继续使用其他一些算子进行处理,如map,flatMap等,但是如果你直接对其调用这些算子时,可能会出现类似unable to find encoder for type stored in a dataset的错误,这种错误的产生一般是因为该DataFrame中的某些返回值的类型不能通过spark自身的的反射完成自动编码,如Map类型,它不属于基本类型,String,case Class和元组类型,因此会报错。
spark数据集需要编码器(encoder)来处理即将存储的数据类型。 对于常见类型(atomics, product types),有许多预定义的编码器可用,但我们必须首先从SparkSession.implicits导入它们才能使其工作,如
import org.apache.spark.sql.SparkSession case class SimpleTuple(id: Int, desc: String) val dataList = List( SimpleTuple(5, "abc"), SimpleTuple(6, "bcd") ) val spark: SparkSession = SparkSession .builder() .appName("test") .config("spark.sql.warehouse.dir", "/tmp") .enableHiveSupport() .getOrCreate() import spark.implicits._ val dataList = ??? val dataset = sparkSession.createDataset(dataList)sparkSession为我们创建的sparkSession的示例。
或者,我们也可以直接提供明确的编码器:
import org.apache.spark.sql.{Encoder, Encoders} val dataset = spark.createDataset(dataList)(Encoders.product[SimpleTuple])encoder是存储类型的编码器。
注意,Enocders还提供了许多用于原子类型的预定义编码器,以及用于复杂编码器的编码器,可以使用ExpressionEncoder进行派生。