Github user jiangxb1987 commented on a diff in the pull request:
https://github.com/apache/spark/pull/22112#discussion_r212381036
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
failedStages += failedStage
failedStages += mapStage
if (noResubmitEnqueued) {
+ // If the map stage is INDETERMINATE, which means the map
tasks may return
+ // different result when re-try, we need to re-try all the
tasks of the failed
+ // stage and its succeeding stages, because the input data
will be changed after the
+ // map tasks are re-tried.
+ // Note that, if map stage is UNORDERED, we are fine. The
shuffle partitioner is
+ // guaranteed to be idempotent, so the input data of the
reducers will not change even
+ // if the map tasks are re-tried.
+ if (mapStage.rdd.computingRandomLevel ==
RDD.RandomLevel.INDETERMINATE) {
+ def rollBackStage(stage: Stage): Unit = stage match {
+ case mapStage: ShuffleMapStage =>
+ val numMissingPartitions =
mapStage.findMissingPartitions().length
+ if (numMissingPartitions < mapStage.numTasks) {
+ markStageAsFinished(
+ mapStage,
+ Some("preceding shuffle map stage with random
output gets retried."),
+ willRetry = true)
+
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
+ failedStages += mapStage
+ }
+
+ case resultStage =>
+ val numMissingPartitions =
resultStage.findMissingPartitions().length
+ if (numMissingPartitions < resultStage.numTasks) {
+ // TODO: support to rollback result tasks.
+ val errorMessage = "A shuffle map stage with random
output was failed and " +
+ s"retried. However, Spark cannot rollback the
result stage $resultStage " +
+ "to re-process the input data, and has to fail
this job. Please " +
+ "eliminate the randomness by checkpointing the RDD
before " +
+ "repartition/zip and try again."
+ abortStage(failedStage, errorMessage, None)
+ }
+ }
+
+ def rollbackSucceedingStages(stageChain: List[Stage]):
Unit = {
+ if (stageChain.head.id == failedStage.id) {
+ stageChain.foreach { stage =>
+ if (!failedStages.contains(stage))
rollBackStage(stage)
+ }
+ } else {
+ stageChain.head.parents.foreach(s =>
rollbackSucceedingStages(s :: stageChain))
+ }
+ }
+
+ rollBackStage(failedStage)
--- End diff --
We may need some comment to explain the tricky corner case here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]