Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19805#discussion_r157361558
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -540,9 +540,52 @@ class Dataset[T] private[sql](
        */
       @Experimental
       @InterfaceStability.Evolving
    -  def checkpoint(eager: Boolean): Dataset[T] = {
    +  def checkpoint(eager: Boolean): Dataset[T] = checkpoint(eager = eager, 
reliableCheckpoint = true)
    +
    +  /**
    +   * Eagerly locally checkpoints a Dataset and return the new Dataset. 
Checkpointing can be
    +   * used to truncate the logical plan of this Dataset, which is 
especially useful in iterative
    +   * algorithms where the plan may grow exponentially. Local checkpoints 
are written to executor
    +   * storage and despite potentially faster they are unreliable and may 
compromise job completion.
    +   *
    +   * @group basic
    +   * @since 2.3.0
    +   */
    +  @Experimental
    +  @InterfaceStability.Evolving
    +  def localCheckpoint(): Dataset[T] = checkpoint(eager = true, 
reliableCheckpoint = false)
    +
    +  /**
    +   * Locally checkpoints a Dataset and return the new Dataset. 
Checkpointing can be used to truncate
    +   * the logical plan of this Dataset, which is especially useful in 
iterative algorithms where the
    +   * plan may grow exponentially. Local checkpoints are written to 
executor storage and despite
    +   * potentially faster they are unreliable and may compromise job 
completion.
    +   *
    +   * @group basic
    +   * @since 2.3.0
    +   */
    +  @Experimental
    +  @InterfaceStability.Evolving
    +  def localCheckpoint(eager: Boolean): Dataset[T] = checkpoint(
    +    eager = eager,
    +    reliableCheckpoint = false
    +  )
    +
    +  /**
    +   * Returns a checkpointed version of this Dataset.
    +   *
    +   * @param eager Whether to checkpoint this dataframe immediately
    +   * @param reliableCheckpoint Whether to create a reliable checkpoint 
saved to files inside the
    +   *                           checkpoint directory. If false creates a 
local checkpoint using
    +   *                           the caching subsystem
    +   */
    +  private def checkpoint(eager: Boolean, reliableCheckpoint: Boolean): 
Dataset[T] = {
         val internalRdd = queryExecution.toRdd.map(_.copy())
    -    internalRdd.checkpoint()
    +    if (reliableCheckpoint) {
    +      internalRdd.checkpoint()
    +    } else {
    +      internalRdd.localCheckpoint()
    --- End diff --
    
    Could you also issue a logWarning message here to indicate the checkpoint 
is not reliable? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to