Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/19805#discussion_r157374081
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -540,9 +540,52 @@ class Dataset[T] private[sql](
*/
@Experimental
@InterfaceStability.Evolving
- def checkpoint(eager: Boolean): Dataset[T] = {
+ def checkpoint(eager: Boolean): Dataset[T] = checkpoint(eager = eager,
reliableCheckpoint = true)
+
+ /**
+ * Eagerly locally checkpoints a Dataset and return the new Dataset.
Checkpointing can be
+ * used to truncate the logical plan of this Dataset, which is
especially useful in iterative
+ * algorithms where the plan may grow exponentially. Local checkpoints
are written to executor
+ * storage and despite potentially faster they are unreliable and may
compromise job completion.
+ *
+ * @group basic
+ * @since 2.3.0
+ */
+ @Experimental
+ @InterfaceStability.Evolving
+ def localCheckpoint(): Dataset[T] = checkpoint(eager = true,
reliableCheckpoint = false)
+
+ /**
+ * Locally checkpoints a Dataset and return the new Dataset.
Checkpointing can be used to truncate
+ * the logical plan of this Dataset, which is especially useful in
iterative algorithms where the
+ * plan may grow exponentially. Local checkpoints are written to
executor storage and despite
+ * potentially faster they are unreliable and may compromise job
completion.
+ *
+ * @group basic
+ * @since 2.3.0
+ */
+ @Experimental
+ @InterfaceStability.Evolving
+ def localCheckpoint(eager: Boolean): Dataset[T] = checkpoint(
+ eager = eager,
+ reliableCheckpoint = false
+ )
+
+ /**
+ * Returns a checkpointed version of this Dataset.
+ *
+ * @param eager Whether to checkpoint this dataframe immediately
+ * @param reliableCheckpoint Whether to create a reliable checkpoint
saved to files inside the
+ * checkpoint directory. If false creates a
local checkpoint using
+ * the caching subsystem
+ */
+ private def checkpoint(eager: Boolean, reliableCheckpoint: Boolean):
Dataset[T] = {
val internalRdd = queryExecution.toRdd.map(_.copy())
- internalRdd.checkpoint()
+ if (reliableCheckpoint) {
+ internalRdd.checkpoint()
+ } else {
+ internalRdd.localCheckpoint()
--- End diff --
cc @zsxwing
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]