cloud-fan commented on code in PR #53274:
URL: https://github.com/apache/spark/pull/53274#discussion_r2610064628


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1913,16 +1898,127 @@ private[spark] class DAGScheduler(
 
   /**
    * If a map stage is non-deterministic, the map tasks of the stage may 
return different result
-   * when re-try. To make sure data correctness, we need to re-try all the 
tasks of its succeeding
-   * stages, as the input data may be changed after the map tasks are 
re-tried. For stages where
-   * rollback and retry all tasks are not possible, we will need to abort the 
stages.
+   * when re-try. To make sure data correctness, we need to clean up shuffles 
to make sure succeeding
+   * stages will be resubmitted and re-try all the tasks, as the input data 
may be changed after
+   * the map tasks are re-tried. For stages where rollback and retry all tasks 
are not possible,
+   * we will need to abort the stages.
+   */
+  private[scheduler] def rollbackSucceedingStages(mapStage: ShuffleMapStage): 
Unit = {
+    val stagesToRollback = collectSucceedingStages(mapStage).filterNot(_ == 
mapStage)
+    val stagesCanRollback = abortStagesUnableToRollback(stagesToRollback)

Review Comment:
   ```suggestion
       val stagesRollingBack = abortStagesUnableToRollback(stagesToRollback)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to