Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/13646#discussion_r66861452
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -412,25 +390,48 @@ class DAGScheduler(
// We are manually maintaining a stack here to prevent
StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
- def visit(r: RDD[_]) {
- if (!visited(r)) {
- visited += r
- for (dep <- r.dependencies) {
- dep match {
- case shufDep: ShuffleDependency[_, _, _] =>
- if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
- parents.push(shufDep)
- }
- case _ =>
- }
- waitingForVisit.push(dep.rdd)
+ waitingForVisit.push(rdd)
+ while (waitingForVisit.nonEmpty) {
+ val toVisit = waitingForVisit.pop()
+ if (!visited(toVisit)) {
+ visited += toVisit
+ getShuffleDependencies(toVisit).foreach { shuffleDep =>
+ if (!shuffleToMapStage.contains(shuffleDep.shuffleId)) {
+ parents.push(shuffleDep)
+ waitingForVisit.push(shuffleDep.rdd)
+ } // Otherwise, the dependency and it's ancestors have already
been registered.
}
}
}
+ parents
+ }
+ /**
+ * Returns shuffle dependencies that are immediate parents of the given
RDD.
+ *
+ * This function will not return more distant ancestors. For example,
if C has a shuffle
+ * dependency on B which has a shuffle dependency on A:
+ *
+ * A <-- B <-- C
+ *
+ * calling this function with rdd C will only return the B <-- C
dependency.
+ */
+ private def getShuffleDependencies(rdd: RDD[_]):
HashSet[ShuffleDependency[_, _, _]] = {
+ val parents = new HashSet[ShuffleDependency[_, _, _]]
+ val visited = new HashSet[RDD[_]]
+ val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
- visit(waitingForVisit.pop())
+ val toVisit = waitingForVisit.pop()
+ if (!visited(toVisit)) {
+ visited += toVisit
+ toVisit.dependencies.foreach {
+ case shuffleDep: ShuffleDependency[_, _, _] =>
+ parents += shuffleDep
+ case dependency: Any =>
--- End diff --
nit: we can remove `: Any`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]