Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212701804 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1865,6 +1871,57 @@ abstract class RDD[T: ClassTag]( // RDD chain. @transient protected lazy val isBarrier_ : Boolean = dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier()) + + /** + * Returns the random level of this RDD's output. Please refer to [[RandomLevel]] for the + * definition. + * + * By default, an reliably checkpointed RDD, or RDD without parents(root RDD) is IDEMPOTENT. For + * RDDs with parents, we will generate a random level candidate per parent according to the + * dependency. The random level of the current RDD is the random level candidate that is random + * most. Please override [[getOutputRandomLevel]] to provide custom logic of calculating output + * random level. + */ + // TODO: make it public so users can set random level to their custom RDDs. + // TODO: this can be per-partition. e.g. UnionRDD can have different random level for different + // partitions. + private[spark] final lazy val outputRandomLevel: RandomLevel.Value = { + if (checkpointData.exists(_.isInstanceOf[ReliableRDDCheckpointData[_]])) { + RandomLevel.IDEMPOTENT + } else { + getOutputRandomLevel + } + } + + @DeveloperApi + protected def getOutputRandomLevel: RandomLevel.Value = { + val randomLevelCandidates = dependencies.map { + case dep: ShuffleDependency[_, _, _] => --- End diff -- @mridulm I didn't add ``` // if same partitioner, then shuffle not done. case dep: ShuffleDependency[_, _, _] if dep.partitioner == partitioner => dep.rdd.computingRandomLevel ``` IIUC this condition is always true for `ShuffledRDD`?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org