作用:Spark的容错机制,避免重新计算数据
1.如何进行checkpoint? SparkContext.setCheckpointDir() RDD.checkpoint()
具体转变: 对RDD调用checkpoint()方法之后,它就接受了RDDCheckpointData对象的管理
RDDCheckpointData对象会负责将调用了checkPoint()方法的RDD的状态设置为MarkedForCheckpoint
RDD所在Job运行结束后,会调用job中最后一个RDD的doCheckpoint()方法,沿着finalRDD的lineage向上查找,标记为MarkedForCheckpoint的RDD,并将其标记为CheckpointingProgress
启动一个单独的job,来讲lineage中,标记为CheckpointingProgress的RDD,进行checkpoint操作,也就是将其数据写入 SparkContext.setCheckpointDir()方法设置的文件系统中
将RDD的数据进行checkpoint之后,会改变RDD的lineage,也就是说,会清楚掉rdd所有的依赖,并强行将其父rdd设置为一个CheckpointRDD,而且RDD状态变成Checkpointed
2.Checkpoint与持久化的不同: (1)前者单独存放在高容错的HDFS文件系统,后者放在内存中 (2)前者改变了被调用RDD的lineage,后者没有 3.RDD.iterator()_也会读取checkpoint数据 4.给checkpoint的RDD,建议先进行persist(StorageLevel.DISK_ONLY)
源码: org.apache.spark.rdd.RDD.scala iterator() computeOrReadCheckpoint() CheckpointRDD.scala
