Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9983#discussion_r45936910
  
    --- Diff: 
core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala ---
    @@ -100,10 +107,52 @@ private[spark] object ReliableCheckpointRDD extends 
Logging {
         "part-%05d".format(partitionIndex)
       }
     
    +  private def checkpointPartitionerFileName(): String = {
    +    "_partitioner"
    +  }
    +
       /**
    -   * Write this partition's values to a checkpoint file.
    +   * Write RDD to checkpoint files and return a ReliableCheckpointRDD 
representing the RDD.
        */
    -  def writeCheckpointFile[T: ClassTag](
    +  def createCheckpointedRDD[T: ClassTag](
    +      originalRDD: RDD[T],
    +      checkpointDir: String,
    +      blockSize: Int = -1): ReliableCheckpointRDD[T] = {
    +
    +    val sc = originalRDD.sparkContext
    +
    +    // Create the output path for the checkpoint
    +    val checkpointDirPath = new Path(checkpointDir)
    +    val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
    +    if (!fs.mkdirs(checkpointDirPath)) {
    +      throw new SparkException(s"Failed to create checkpoint path 
$checkpointDirPath")
    +    }
    +
    +    // Save to file, and reload it as an RDD
    +    val broadcastedConf = sc.broadcast(
    +      new SerializableConfiguration(sc.hadoopConfiguration))
    +    // TODO: This is expensive because it computes the RDD again 
unnecessarily (SPARK-8582)
    +    sc.runJob(originalRDD,
    +      writePartitionToCheckpointFile[T](checkpointDirPath.toString, 
broadcastedConf) _)
    +
    +    if (originalRDD.partitioner.nonEmpty) {
    +      writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, 
checkpointDirPath)
    +    }
    +
    +    val newRDD = new ReliableCheckpointRDD[T](
    +      sc, checkpointDirPath.toString, originalRDD.partitioner)
    +    if (newRDD.partitions.length != originalRDD.partitions.length) {
    +      throw new SparkException(
    +        s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has 
different " +
    +          s"number of partitions from original RDD 
$originalRDD(${originalRDD.partitions.length})")
    +    }
    +    newRDD
    +  }
    +
    +  /**
    +   * Write a RDD partition's data to a checkpoint file.
    +   */
    +  def writePartitionToCheckpointFile[T: ClassTag](
    --- End diff --
    
    private


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to