Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22112#discussion_r212653282
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
                 failedStages += failedStage
                 failedStages += mapStage
                 if (noResubmitEnqueued) {
    +              // If the map stage is INDETERMINATE, which means the map 
tasks may return
    +              // different result when re-try, we need to re-try all the 
tasks of the failed
    +              // stage and its succeeding stages, because the input data 
will be changed after the
    +              // map tasks are re-tried.
    +              // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
    +              // guaranteed to be idempotent, so the input data of the 
reducers will not change even
    +              // if the map tasks are re-tried.
    +              if (mapStage.rdd.computingRandomLevel == 
RDD.RandomLevel.INDETERMINATE) {
    +                def rollBackStage(stage: Stage): Unit = stage match {
    +                  case mapStage: ShuffleMapStage =>
    +                    val numMissingPartitions = 
mapStage.findMissingPartitions().length
    +                    if (numMissingPartitions < mapStage.numTasks) {
    +                      markStageAsFinished(
    +                        mapStage,
    +                        Some("preceding shuffle map stage with random 
output gets retried."),
    +                        willRetry = true)
    +                      
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
    --- End diff --
    
    IIRC we didn't cancel running tasks for failed stage attempts because we 
still expect them to finish and write outputs, however it's not that case when 
you decide to retry all the tasks in a stage.  You can call 
`taskScheduler.killAllTaskAttempts()` to kill all running tasks for a specific 
stage without failing the stage.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to