Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22112#discussion_r212633788
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1502,6 +1502,60 @@ private[spark] class DAGScheduler(
failedStages += failedStage
failedStages += mapStage
if (noResubmitEnqueued) {
+ // If the map stage is INDETERMINATE, which means the map
tasks may return
+ // different result when re-try, we need to re-try all the
tasks of the failed
+ // stage and its succeeding stages, because the input data
will be changed after the
+ // map tasks are re-tried.
+ // Note that, if map stage is UNORDERED, we are fine. The
shuffle partitioner is
+ // guaranteed to be idempotent, so the input data of the
reducers will not change even
+ // if the map tasks are re-tried.
+ if (mapStage.rdd.outputRandomLevel ==
RandomLevel.INDETERMINATE) {
+ // It's a little tricky to find all the succeeding stages
of `failedStage`, because
+ // each stage only know its parents not children. Here we
traverse the stages from
+ // the leaf nodes (the result stages of active jobs), and
rollback all the stages
+ // in the stage chains that connect to the `failedStage`.
To speed up the stage
+ // traversing, we collect the stages to rollback first. If
a stage needs to
+ // rollback, all its succeeding stages need to rollback to.
+ val stagesToRollback =
scala.collection.mutable.HashSet(failedStage)
--- End diff --
@mridulm Thanks for your suggestion about memorization! I think this
approach should work like you expected.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]