venkata91 commented on a change in pull request #33896:
URL: https://github.com/apache/spark/pull/33896#discussion_r768081977



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1630,6 +1652,50 @@ private[spark] class DAGScheduler(
     }
   }
 
+  private[scheduler] def checkAndScheduleShuffleMergeFinalize(
+      shuffleStage: ShuffleMapStage): Unit = {
+    // Check if a finalize task has already been scheduled. This is to prevent 
scenarios
+    // where we don't schedule multiple shuffle merge finalization which can 
happen due
+    // stage retry or shufflePushMinRatio is already hit etc.
+    if (shuffleStage.shuffleDep.getFinalizeTask.isEmpty) {
+      // 1. Stage indeterminate and some map outputs are not available - 
finalize
+      // immediately without registering shuffle merge results.
+      // 2. Stage determinate and some map outputs are not available - decide 
to
+      // register merge results based on map outputs size available and
+      // shuffleMergeWaitMinSizeThreshold.
+      // 3. All shuffle outputs available - decide to register merge results 
based
+      // on map outputs size available and shuffleMergeWaitMinSizeThreshold.
+      val totalSize = {
+        lazy val computedTotalSize =
+          mapOutputTracker.getStatistics(shuffleStage.shuffleDep).
+            bytesByPartitionId.filter(_ > 0).sum
+        if (shuffleStage.isAvailable) {
+          computedTotalSize
+        } else {
+          if (shuffleStage.isIndeterminate) {
+            0L
+          } else {
+            computedTotalSize
+          }
+        }
+      }
+
+      if (totalSize < shuffleMergeWaitMinSizeThreshold) {
+        // Check if we can process map stage completion. If shuffle merge 
finalization
+        // is already triggered because push completion ratio was reached 
earlier,
+        // we cannot process map stage completion, but have to wait for the 
finalization
+        // to finish. This is because it's not straightforward to interrupt the
+        // finalization task and undo what it might have already done.
+        if (scheduleShuffleMergeFinalize(shuffleStage, delay = 0,
+          registerMergeResults = false)) {

Review comment:
       Yes that is true but the advantage here is since the size of shuffle 
data generated is < `shuffleMergeWaitMinSizeThreshold` we want to immediately 
mark shuffle merge finalized so that the next stage would proceed immediately 
without waiting for all the shuffle mergers (external shuffle services) to 
finalize the shuffle merge. This is just an optimization to reduce the overall 
delay in proceeding with the next stage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to