Github user mridulm commented on a diff in the pull request:
https://github.com/apache/spark/pull/22112#discussion_r212705976
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1865,6 +1871,57 @@ abstract class RDD[T: ClassTag](
// RDD chain.
@transient protected lazy val isBarrier_ : Boolean =
dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _,
_]]).exists(_.rdd.isBarrier())
+
+ /**
+ * Returns the random level of this RDD's output. Please refer to
[[RandomLevel]] for the
+ * definition.
+ *
+ * By default, an reliably checkpointed RDD, or RDD without parents(root
RDD) is IDEMPOTENT. For
+ * RDDs with parents, we will generate a random level candidate per
parent according to the
+ * dependency. The random level of the current RDD is the random level
candidate that is random
+ * most. Please override [[getOutputRandomLevel]] to provide custom
logic of calculating output
+ * random level.
+ */
+ // TODO: make it public so users can set random level to their custom
RDDs.
+ // TODO: this can be per-partition. e.g. UnionRDD can have different
random level for different
+ // partitions.
+ private[spark] final lazy val outputRandomLevel: RandomLevel.Value = {
+ if
(checkpointData.exists(_.isInstanceOf[ReliableRDDCheckpointData[_]])) {
+ RandomLevel.IDEMPOTENT
+ } else {
+ getOutputRandomLevel
+ }
+ }
+
+ @DeveloperApi
+ protected def getOutputRandomLevel: RandomLevel.Value = {
+ val randomLevelCandidates = dependencies.map {
+ case dep: ShuffleDependency[_, _, _] =>
--- End diff --
`dep.rdd.partitioner` sorry
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]