Github user jiangxb1987 commented on a diff in the pull request:
https://github.com/apache/spark/pull/22112#discussion_r212653282
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
failedStages += failedStage
failedStages += mapStage
if (noResubmitEnqueued) {
+ // If the map stage is INDETERMINATE, which means the map
tasks may return
+ // different result when re-try, we need to re-try all the
tasks of the failed
+ // stage and its succeeding stages, because the input data
will be changed after the
+ // map tasks are re-tried.
+ // Note that, if map stage is UNORDERED, we are fine. The
shuffle partitioner is
+ // guaranteed to be idempotent, so the input data of the
reducers will not change even
+ // if the map tasks are re-tried.
+ if (mapStage.rdd.computingRandomLevel ==
RDD.RandomLevel.INDETERMINATE) {
+ def rollBackStage(stage: Stage): Unit = stage match {
+ case mapStage: ShuffleMapStage =>
+ val numMissingPartitions =
mapStage.findMissingPartitions().length
+ if (numMissingPartitions < mapStage.numTasks) {
+ markStageAsFinished(
+ mapStage,
+ Some("preceding shuffle map stage with random
output gets retried."),
+ willRetry = true)
+
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
--- End diff --
IIRC we didn't cancel running tasks for failed stage attempts because we
still expect them to finish and write outputs, however it's not that case when
you decide to retry all the tasks in a stage. You can call
`taskScheduler.killAllTaskAttempts()` to kill all running tasks for a specific
stage without failing the stage.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]