Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19805#discussion_r157361558 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -540,9 +540,52 @@ class Dataset[T] private[sql]( */ @Experimental @InterfaceStability.Evolving - def checkpoint(eager: Boolean): Dataset[T] = { + def checkpoint(eager: Boolean): Dataset[T] = checkpoint(eager = eager, reliableCheckpoint = true) + + /** + * Eagerly locally checkpoints a Dataset and return the new Dataset. Checkpointing can be + * used to truncate the logical plan of this Dataset, which is especially useful in iterative + * algorithms where the plan may grow exponentially. Local checkpoints are written to executor + * storage and despite potentially faster they are unreliable and may compromise job completion. + * + * @group basic + * @since 2.3.0 + */ + @Experimental + @InterfaceStability.Evolving + def localCheckpoint(): Dataset[T] = checkpoint(eager = true, reliableCheckpoint = false) + + /** + * Locally checkpoints a Dataset and return the new Dataset. Checkpointing can be used to truncate + * the logical plan of this Dataset, which is especially useful in iterative algorithms where the + * plan may grow exponentially. Local checkpoints are written to executor storage and despite + * potentially faster they are unreliable and may compromise job completion. + * + * @group basic + * @since 2.3.0 + */ + @Experimental + @InterfaceStability.Evolving + def localCheckpoint(eager: Boolean): Dataset[T] = checkpoint( + eager = eager, + reliableCheckpoint = false + ) + + /** + * Returns a checkpointed version of this Dataset. + * + * @param eager Whether to checkpoint this dataframe immediately + * @param reliableCheckpoint Whether to create a reliable checkpoint saved to files inside the + * checkpoint directory. If false creates a local checkpoint using + * the caching subsystem + */ + private def checkpoint(eager: Boolean, reliableCheckpoint: Boolean): Dataset[T] = { val internalRdd = queryExecution.toRdd.map(_.copy()) - internalRdd.checkpoint() + if (reliableCheckpoint) { + internalRdd.checkpoint() + } else { + internalRdd.localCheckpoint() --- End diff -- Could you also issue a logWarning message here to indicate the checkpoint is not reliable?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org