Github user jiangxb1987 commented on a diff in the pull request:
https://github.com/apache/spark/pull/22112#discussion_r212379326
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
failedStages += failedStage
failedStages += mapStage
if (noResubmitEnqueued) {
+ // If the map stage is INDETERMINATE, which means the map
tasks may return
+ // different result when re-try, we need to re-try all the
tasks of the failed
+ // stage and its succeeding stages, because the input data
will be changed after the
+ // map tasks are re-tried.
+ // Note that, if map stage is UNORDERED, we are fine. The
shuffle partitioner is
+ // guaranteed to be idempotent, so the input data of the
reducers will not change even
+ // if the map tasks are re-tried.
+ if (mapStage.rdd.computingRandomLevel ==
RDD.RandomLevel.INDETERMINATE) {
+ def rollBackStage(stage: Stage): Unit = stage match {
+ case mapStage: ShuffleMapStage =>
+ val numMissingPartitions =
mapStage.findMissingPartitions().length
+ if (numMissingPartitions < mapStage.numTasks) {
+ markStageAsFinished(
+ mapStage,
+ Some("preceding shuffle map stage with random
output gets retried."),
+ willRetry = true)
+
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
--- End diff --
We shall also kill all running tasks for this map stage.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]