cloud-fan commented on code in PR #53782:
URL: https://github.com/apache/spark/pull/53782#discussion_r2693184813
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1555,34 +1555,24 @@ private[spark] class DAGScheduler(
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
logDebug("submitMissingTasks(" + stage + ")")
- // Before find missing partition, do the intermediate state clean work
first.
- // The operation here can make sure for the partially completed
intermediate stage,
- // `findMissingPartitions()` returns all partitions every time.
+ // For statically indeterminate stages being retried, we trigger rollback
BEFORE task
+ // submission. This is more efficient than deferring to task completion
because:
+ // 1. It avoids submitting a partial stage that would need to be cancelled
+ // 2. It ensures findMissingPartitions() returns ALL partitions for the
retry
+ //
+ // For runtime detection (checksum mismatch), we must defer to task
completion because
+ // we don't know the stage is indeterminate until we see the checksum
differ.
stage match {
case sms: ShuffleMapStage if !sms.isAvailable =>
- if (!sms.shuffleDep.checksumMismatchFullRetryEnabled &&
stage.isIndeterminate) {
- // already executed at least once
- if (sms.getNextAttemptId > 0) {
- // While we previously validated possible rollbacks during the
handling of a FetchFailure,
- // where we were fetching from an indeterminate source map stages,
this later check
- // covers additional cases like recalculating an indeterminate
stage after an executor
- // loss. Moreover, because this check occurs later in the process,
if a result stage task
- // has successfully completed, we can detect this and abort the
job, as rolling back a
- // result stage is not possible.
- val stagesToRollback = collectSucceedingStages(sms)
- filterAndAbortUnrollbackableStages(stagesToRollback)
- // stages which cannot be rolled back were aborted which leads to
removing the
- // the dependant job(s) from the active jobs set
- val numActiveJobsWithStageAfterRollback =
- activeJobs.count(job =>
stagesToRollback.contains(job.finalStage))
- if (numActiveJobsWithStageAfterRollback == 0) {
- logInfo(log"All jobs depending on the indeterminate stage " +
- log"(${MDC(STAGE_ID, stage.id)}) were aborted so this stage is
not needed anymore.")
- return
- }
+ if (sms.isStaticallyIndeterminate &&
+ !sms.shuffleDep.checksumMismatchFullRetryEnabled &&
+ sms.getNextAttemptId > 0) {
+ rollbackSucceedingStages(sms, rollbackCurrentStage = true)
Review Comment:
let's add a bit more comment to explain it. We are not really rolling back
the current stage, as it's starting a new attempt and there is nothing to
rollback. What we really want is to have a fresh start for this attempt: clear
the shuffle output and ignore task results from the old stage attempts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]