Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212209237 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is INDETERMINATE, which means the map tasks may return + // different result when re-try, we need to re-try all the tasks of the failed + // stage and its succeeding stages, because the input data will be changed after the + // map tasks are re-tried. + // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is + // guaranteed to be idempotent, so the input data of the reducers will not change even + // if the map tasks are re-tried. + if (mapStage.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { + def rollBackStage(stage: Stage): Unit = stage match { + case mapStage: ShuffleMapStage => + val numMissingPartitions = mapStage.findMissingPartitions().length + if (numMissingPartitions < mapStage.numTasks) { + markStageAsFinished( + mapStage, + Some("preceding shuffle map stage with random output gets retried."), + willRetry = true) + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + failedStages += mapStage + } + + case resultStage => + val numMissingPartitions = resultStage.findMissingPartitions().length + if (numMissingPartitions < resultStage.numTasks) { + // TODO: support to rollback result tasks. + val errorMessage = "A shuffle map stage with random output was failed and " + + s"retried. However, Spark cannot rollback the result stage $resultStage " + + "to re-process the input data, and has to fail this job. Please " + + "eliminate the randomness by checkpointing the RDD before " + + "repartition/zip and try again." + abortStage(failedStage, errorMessage, None) + } + } + + def rollbackSucceedingStages(stageChain: List[Stage]): Unit = { + if (stageChain.head.id == failedStage.id) { + stageChain.foreach { stage => + if (!failedStages.contains(stage)) rollBackStage(stage) + } + } else { + stageChain.head.parents.foreach(s => rollbackSucceedingStages(s :: stageChain)) + } + } --- End diff -- On the other hand, this branch will not be hitten very offten. We only hit it when a FetchFailure happens and the map stage has random output.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org