ivoson commented on code in PR #52336:
URL: https://github.com/apache/spark/pull/52336#discussion_r2638670964


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1551,29 +1551,46 @@ private[spark] class DAGScheduler(
     // The operation here can make sure for the partially completed 
intermediate stage,
     // `findMissingPartitions()` returns all partitions every time.
     stage match {
-      case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
-        // already executed at least once
-        if (sms.getNextAttemptId > 0) {
-          // While we previously validated possible rollbacks during the 
handling of a FetchFailure,
-          // where we were fetching from an indeterminate source map stages, 
this later check
-          // covers additional cases like recalculating an indeterminate stage 
after an executor
-          // loss. Moreover, because this check occurs later in the process, 
if a result stage task
-          // has successfully completed, we can detect this and abort the job, 
as rolling back a
-          // result stage is not possible.
-          val stagesToRollback = collectSucceedingStages(sms)
-          abortStageWithInvalidRollBack(stagesToRollback)
-          // stages which cannot be rolled back were aborted which leads to 
removing the
-          // the dependant job(s) from the active jobs set
-          val numActiveJobsWithStageAfterRollback =
-            activeJobs.count(job => stagesToRollback.contains(job.finalStage))
-          if (numActiveJobsWithStageAfterRollback == 0) {
-            logInfo(log"All jobs depending on the indeterminate stage " +
-              log"(${MDC(STAGE_ID, stage.id)}) were aborted so this stage is 
not needed anymore.")
-            return
+      case sms: ShuffleMapStage if !sms.isAvailable =>
+        val needFullStageRetry = if 
(sms.shuffleDep.checksumMismatchFullRetryEnabled) {

Review Comment:
   Hi @mridulm to recompute the indeterminate stages, we'll clean up all the 
shuffle outputs and shuffle merge state for push-based shuffle. Would that 
resolve your concern regarding to push-based shuffle?
   
   
https://github.com/apache/spark/blob/d0fbb15bbc98798473ba842dcfc2dec601a7a376/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1584-L1585
 
   ```
   mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
   sms.shuffleDep.newShuffleMergeState()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to