cloud-fan commented on code in PR #53868:
URL: https://github.com/apache/spark/pull/53868#discussion_r2730815167
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1898,22 +1940,29 @@ private[spark] class DAGScheduler(
* stages will be resubmitted and re-try all the tasks, as the input data
may be changed after
* the map tasks are re-tried. For stages where rollback and retry all tasks
are not possible,
* we will need to abort the stages.
+ *
+ * @return true if jobs are not aborted and will continue to run after
rollback, otherwise false
*/
private[scheduler] def rollbackSucceedingStages(
mapStage: ShuffleMapStage,
- rollbackCurrentStage: Boolean = false): Unit = {
+ rollbackCurrentStage: Boolean = false): Boolean = {
val stagesToRollback = if (rollbackCurrentStage) {
collectSucceedingStages(mapStage)
} else {
collectSucceedingStages(mapStage).filterNot(_ == mapStage)
}
- val stagesCanRollback =
filterAndAbortUnrollbackableStages(stagesToRollback)
+ logInfo(log"Found succeeding stages ${MDC(STAGES, stagesToRollback)} of " +
+ log"shuffle checksum mismatch stage ${MDC(STAGE, mapStage)} in active
jobs")
+ val stagesCanRollback =
+ filterAndAbortUnrollbackableStages(stagesToRollback)
Review Comment:
```suggestion
val stagesCanRollback =
filterAndAbortUnrollbackableStages(stagesToRollback)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]