cloud-fan commented on code in PR #53274:
URL: https://github.com/apache/spark/pull/53274#discussion_r2610064628
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1913,16 +1898,127 @@ private[spark] class DAGScheduler(
/**
* If a map stage is non-deterministic, the map tasks of the stage may
return different result
- * when re-try. To make sure data correctness, we need to re-try all the
tasks of its succeeding
- * stages, as the input data may be changed after the map tasks are
re-tried. For stages where
- * rollback and retry all tasks are not possible, we will need to abort the
stages.
+ * when re-try. To make sure data correctness, we need to clean up shuffles
to make sure succeeding
+ * stages will be resubmitted and re-try all the tasks, as the input data
may be changed after
+ * the map tasks are re-tried. For stages where rollback and retry all tasks
are not possible,
+ * we will need to abort the stages.
+ */
+ private[scheduler] def rollbackSucceedingStages(mapStage: ShuffleMapStage):
Unit = {
+ val stagesToRollback = collectSucceedingStages(mapStage).filterNot(_ ==
mapStage)
+ val stagesCanRollback = abortStagesUnableToRollback(stagesToRollback)
Review Comment:
```suggestion
val stagesRollingBack = abortStagesUnableToRollback(stagesToRollback)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]