Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/7279#discussion_r36039659
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1448,33 +1468,92 @@ abstract class RDD[T: ClassTag](
/**
* Mark this RDD for checkpointing. It will be saved to a file inside
the checkpoint
- * directory set with SparkContext.setCheckpointDir() and all references
to its parent
+ * directory set with `SparkContext#setCheckpointDir` and all references
to its parent
* RDDs will be removed. This function must be called before any job has
been
* executed on this RDD. It is strongly recommended that this RDD is
persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
- def checkpoint(): Unit = {
+ def checkpoint(): Unit = RDDCheckpointData.synchronized {
+ // NOTE: we use a global lock here due to complexities downstream with
ensuring
+ // children RDD partitions point to the correct parent partitions. In
the future
+ // we should revisit this consideration.
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in
the SparkContext")
} else if (checkpointData.isEmpty) {
- // NOTE: we use a global lock here due to complexities downstream
with ensuring
- // children RDD partitions point to the correct parent partitions.
In the future
- // we should revisit this consideration.
- RDDCheckpointData.synchronized {
- checkpointData = Some(new RDDCheckpointData(this))
- }
+ checkpointData = Some(new ReliableRDDCheckpointData(this))
+ }
+ }
+
+ /**
+ * Mark this RDD for local checkpointing using Spark's existing caching
layer.
+ *
+ * This method is for users who wish to truncate RDD lineages while
skipping the expensive
+ * step of replicating the materialized data in a reliable distributed
file system. This is
+ * useful for RDDs with long lineages that need to be truncated
periodically (e.g. GraphX).
+ *
+ * Local checkpointing sacrifices fault-tolerance for performance. In
particular, checkpointed
+ * data is written to ephemeral local storage in the executors instead
of to a reliable,
+ * fault-tolerant storage. The effect is that if an executor fails
during the computation,
+ * the checkpointed data may no longer be accessible, causing an
irrecoverable job failure.
+ *
+ * This is NOT safe to use with dynamic allocation, which removes
executors along
+ * with their cached blocks. If you must use both features, you are
advised to set
+ * `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value.
+ *
+ * The checkpoint directory set through `SparkContext#setCheckpointDir`
is not used.
+ */
+ def localCheckpoint(): this.type = RDDCheckpointData.synchronized {
+ if (conf.getBoolean("spark.dynamicAllocation.enabled", false) &&
+
conf.contains("spark.dynamicAllocation.cachedExecutorIdleTimeout")) {
--- End diff --
Ahh ok
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]