Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7279#discussion_r36038738
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
    @@ -1448,33 +1468,92 @@ abstract class RDD[T: ClassTag](
     
       /**
        * Mark this RDD for checkpointing. It will be saved to a file inside 
the checkpoint
    -   * directory set with SparkContext.setCheckpointDir() and all references 
to its parent
    +   * directory set with `SparkContext#setCheckpointDir` and all references 
to its parent
        * RDDs will be removed. This function must be called before any job has 
been
        * executed on this RDD. It is strongly recommended that this RDD is 
persisted in
        * memory, otherwise saving it on a file will require recomputation.
        */
    -  def checkpoint(): Unit = {
    +  def checkpoint(): Unit = RDDCheckpointData.synchronized {
    +    // NOTE: we use a global lock here due to complexities downstream with 
ensuring
    +    // children RDD partitions point to the correct parent partitions. In 
the future
    +    // we should revisit this consideration.
         if (context.checkpointDir.isEmpty) {
           throw new SparkException("Checkpoint directory has not been set in 
the SparkContext")
         } else if (checkpointData.isEmpty) {
    -      // NOTE: we use a global lock here due to complexities downstream 
with ensuring
    -      // children RDD partitions point to the correct parent partitions. 
In the future
    -      // we should revisit this consideration.
    -      RDDCheckpointData.synchronized {
    -        checkpointData = Some(new RDDCheckpointData(this))
    -      }
    +      checkpointData = Some(new ReliableRDDCheckpointData(this))
    +    }
    +  }
    +
    +  /**
    +   * Mark this RDD for local checkpointing using Spark's existing caching 
layer.
    +   *
    +   * This method is for users who wish to truncate RDD lineages while 
skipping the expensive
    +   * step of replicating the materialized data in a reliable distributed 
file system. This is
    +   * useful for RDDs with long lineages that need to be truncated 
periodically (e.g. GraphX).
    +   *
    +   * Local checkpointing sacrifices fault-tolerance for performance. In 
particular, checkpointed
    +   * data is written to ephemeral local storage in the executors instead 
of to a reliable,
    +   * fault-tolerant storage. The effect is that if an executor fails 
during the computation,
    +   * the checkpointed data may no longer be accessible, causing an 
irrecoverable job failure.
    +   *
    +   * This is NOT safe to use with dynamic allocation, which removes 
executors along
    +   * with their cached blocks. If you must use both features, you are 
advised to set
    +   * `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value.
    +   *
    +   * The checkpoint directory set through `SparkContext#setCheckpointDir` 
is not used.
    +   */
    +  def localCheckpoint(): this.type = RDDCheckpointData.synchronized {
    +    if (conf.getBoolean("spark.dynamicAllocation.enabled", false) &&
    +        
conf.contains("spark.dynamicAllocation.cachedExecutorIdleTimeout")) {
    --- End diff --
    
    Shoudnt this be "!conf.contains". I am guessing that you want to throw the 
warning when the user has enabled dynamic allocation but has not specified a 
timeout? Isnt it?
    
    But even if the idle timeout is set, you dont know whether this timeout is 
sufficiently large for his/her purpose. So probably you should issue the 
warning even if the timeout is set. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to