Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/21927#discussion_r206593748
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -451,6 +465,32 @@ class DAGScheduler(
parents
}
+ /**
+ * Traverse all the parent RDDs within the same stage with the given
RDD, check whether all the
+ * parent RDDs satisfy a given predicate.
+ */
+ private def traverseParentRDDsWithinStage(rdd: RDD[_], predicate: RDD[_]
=> Boolean): Boolean = {
+ val visited = new HashSet[RDD[_]]
+ val waitingForVisit = new ArrayStack[RDD[_]]
+ waitingForVisit.push(rdd)
+ while (waitingForVisit.nonEmpty) {
+ val toVisit = waitingForVisit.pop()
+ if (!visited(toVisit)) {
+ if (!predicate(toVisit)) {
+ return false
+ }
+ visited += toVisit
+ toVisit.dependencies.foreach {
+ case shuffleDep: ShuffleDependency[_, _, _] =>
--- End diff --
minor: `shuffleDep` is not used. You can use `_`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]