cloud-fan commented on code in PR #53868:
URL: https://github.com/apache/spark/pull/53868#discussion_r2730868254


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1940,6 +1989,94 @@ private[spark] class DAGScheduler(
         log"was retried, we will roll back and rerun its succeeding " +
         log"stages: ${MDC(STAGES, stagesCanRollback)}")
     }
+    // Whether there are still active jobs which need to be processed
+    hasActiveJobs
+  }
+
+  private def getCompletedJobsFromSameQuery(mapStage: ShuffleMapStage): 
Array[ActiveJob] = {
+    import scala.jdk.CollectionConverters._
+    val executionIds = mapStage
+      .jobIds
+      .flatMap(jobId => Option(jobIdToQueryExecutionId.get(jobId)))
+    if (executionIds.size > 1) {
+      logWarning(log"There are multiple queries reuse the same stage: 
${MDC(STAGE, mapStage)}")
+    }
+    executionIds
+      .flatMap(qeId => 
Option(activeQueryToJobs.get(qeId)).map(_.asScala).getOrElse(Set.empty))
+      .diff(activeJobs)
+      .toArray
+      .sortBy(_.jobId)
+  }
+
+  private def abortSucceedingFinalStages(mapStage: ShuffleMapStage, reason: 
String): Unit = {
+    val jobFinalStages = activeJobs.map(_.finalStage).toSet
+
+    collectSucceedingStages(mapStage)
+      .intersect(jobFinalStages)
+      .foreach { stage =>
+        // Abort stage will fail the jobs depending on it, cleaning up the 
stages for these jobs:
+        // 1. cancel running stages
+        // 2. clean up the stages from DAGScheduler if no active jobs 
depending on them, waiting
+        //    stages will be removed and won't be submitted.
+        // As we abort all the succeeding active jobs, all the succeeding 
stages should be
+        // cleaned up.
+        abortStage(stage, reason, exception = None)
+      }
+  }
+
+  // In rollbackSucceedingStages, we assume that the jobs are independent even 
if they reuse
+  // the same shuffle. If an active job hits fetch failure and keeps retrying 
upstream stages
+  // cascadingly, and a shuffle checksum mismatch is detected, these retried 
stages will be
+  // fully rolled back if possible, or the job will be aborted. However, we 
won't do anything for
+  // the other active jobs.
+  //
+  // For SQL query execution, all the jobs from the same query are related as 
they all contribute
+  // to the query result in some ways. For example, the jobs for subquery 
expressions are not part
+  // of the main query RDD, but they are still related. We need to guarantee 
the consistency of all
+  // the jobs, which means if a shuffle stage hits checksum mismatch, all its 
downstream stages in
+  // all the jobs of the same query execution should be rolled back if 
possible.
+  //
+  // This method does:
+  // 1. Find all the jobs triggered by the same query execution including the 
completed ones.
+  // 2. Find all the succeeding stages of the checksum mismatch shuffle stage 
based on shuffle id
+  //    as stage can be different in different jobs even they share the same 
shuffle.
+  // 3. Abort succeeding stages if their leaf result stages have started 
running.
+  // 4. Clean up shuffle outputs of the checksum mismatched shuffle stages 
which are available to
+  //    make sure all these stages would be resubmitted and fully retied.
+  // 5. Cancel running shuffle map stages and resubmit.
+  private[scheduler] def rollbackSucceedingStagesForQuery(mapStage: 
ShuffleMapStage): Unit = {
+    // Find the completed jobs triggered by the same query execution.
+    val completedJobs = getCompletedJobsFromSameQuery(mapStage)
+    val succeedingStagesInCompletedJobs =
+      collectSucceedingStagesByShuffleId(mapStage, completedJobs)
+    logInfo(log"Found succeeding stages ${MDC(STAGES, 
succeedingStagesInCompletedJobs)} of " +
+      log"shuffle checksum mismatch stage ${MDC(STAGE, mapStage)} in completed 
jobs: (" +
+      log"${MDC(JOB_IDS, completedJobs.map(_.jobId).mkString(","))})")
+
+    // Abort all the succeeding final stages in active jobs to fail fast and 
avoid wasting
+    // resources if there are succeeding result stages in completed jobs.
+    val completedResultStages =
+      succeedingStagesInCompletedJobs.collect { case r: ResultStage => r }
+    if (completedResultStages.nonEmpty) {
+      val reason = s"cannot rollback completed result stages 
${completedResultStages}, " +
+        s"please re-run the query to ensure data correctness"
+      abortSucceedingFinalStages(mapStage, reason)
+      return
+    }
+
+    // Rollback the succeeding stages in active jobs. If there are no active 
jobs left after
+    // rollback, we can skip the rollback for completed jobs.
+    val hasActiveJobs = rollbackSucceedingStages(mapStage)
+    if (hasActiveJobs) {
+      // Rollback the shuffle map stages in completed jobs to make sure the 
completed shuffle
+      // map stages would be re-submitted and fully retried.

Review Comment:
   if there are succeeding stages in completed jobs, we should have aborted the 
stages already, why do we need to roll back here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to