otterc commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r639138475



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2060,17 +2060,18 @@ private[spark] class DAGScheduler(
       stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
         case (shuffleServiceLoc, index) =>
           // Sends async request to shuffle service to finalize shuffle merge 
on that host
+          // TODO: Cancel finalizeShuffleMerge if the stage is cancelled

Review comment:
       Can you create a jira for this TODO?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2136,9 +2137,24 @@ private[spark] class DAGScheduler(
     }
   }
 
-  private[scheduler] def handleShuffleMergeFinalized(stage: ShuffleMapStage): 
Unit = {
-    stage.shuffleDep.markShuffleMergeFinalized
-    processShuffleMapStageCompletion(stage)
+  private[scheduler] def handleRegisterMergeStatuses(
+      stage: ShuffleMapStage,
+      mergeStatuses: Seq[(Int, MergeStatus)]): Unit = {
+    // Register merge statuses if the stage is still running and shuffle merge 
is not finalized yet.
+    if (runningStages.contains(stage) && 
!stage.shuffleDep.shuffleMergeFinalized) {
+      mapOutputTracker.registerMergeResults(stage.shuffleDep.shuffleId, 
mergeStatuses)
+    }
+  }
+
+  private[scheduler] def handleShuffleMergeFinalized(
+      stage: ShuffleMapStage): Unit = {
+    // Only update MapOutputTracker metadata if the stage is still active. i.e 
not cancelled.
+    if (runningStages.contains(stage)) {
+      stage.shuffleDep.markShuffleMergeFinalized()
+      processShuffleMapStageCompletion(stage)
+    } else {
+      mapOutputTracker.unregisterAllMergeResult(stage.shuffleDep.shuffleId)

Review comment:
       If the stage is not part of runningStages does it really mean that it 
was cancelled? 
   It seems like runningStages contain active stages. If a stage is completed 
successfully, it will not be part of runningStages. Why do we unregister all 
the mergeResults here ? 
   Also please add a comment that why we need to unregister all the merge 
results here

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2136,9 +2137,24 @@ private[spark] class DAGScheduler(
     }
   }
 
-  private[scheduler] def handleShuffleMergeFinalized(stage: ShuffleMapStage): 
Unit = {
-    stage.shuffleDep.markShuffleMergeFinalized
-    processShuffleMapStageCompletion(stage)
+  private[scheduler] def handleRegisterMergeStatuses(
+      stage: ShuffleMapStage,
+      mergeStatuses: Seq[(Int, MergeStatus)]): Unit = {
+    // Register merge statuses if the stage is still running and shuffle merge 
is not finalized yet.
+    if (runningStages.contains(stage) && 
!stage.shuffleDep.shuffleMergeFinalized) {

Review comment:
       @mridulm mentioned that one of the benefits of this way is that we can 
still accept and register merge statuses which are received after the stage 
gets finalized.
   But here, we are ignoring these  statuses if shuffle merge is finalized. Why?
    In case of deterministic stage retries, accepting merge statuses after 
finalize is still okay, isn't it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to