Ngone51 commented on a change in pull request #33896:
URL: https://github.com/apache/spark/pull/33896#discussion_r768293190



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1630,6 +1652,50 @@ private[spark] class DAGScheduler(
     }
   }
 
+  private[scheduler] def checkAndScheduleShuffleMergeFinalize(
+      shuffleStage: ShuffleMapStage): Unit = {
+    // Check if a finalize task has already been scheduled. This is to prevent 
scenarios
+    // where we don't schedule multiple shuffle merge finalization which can 
happen due
+    // stage retry or shufflePushMinRatio is already hit etc.
+    if (shuffleStage.shuffleDep.getFinalizeTask.isEmpty) {
+      // 1. Stage indeterminate and some map outputs are not available - 
finalize
+      // immediately without registering shuffle merge results.
+      // 2. Stage determinate and some map outputs are not available - decide 
to
+      // register merge results based on map outputs size available and
+      // shuffleMergeWaitMinSizeThreshold.
+      // 3. All shuffle outputs available - decide to register merge results 
based
+      // on map outputs size available and shuffleMergeWaitMinSizeThreshold.
+      val totalSize = {
+        lazy val computedTotalSize =
+          mapOutputTracker.getStatistics(shuffleStage.shuffleDep).
+            bytesByPartitionId.filter(_ > 0).sum
+        if (shuffleStage.isAvailable) {
+          computedTotalSize
+        } else {
+          if (shuffleStage.isIndeterminate) {
+            0L
+          } else {
+            computedTotalSize
+          }
+        }
+      }
+
+      if (totalSize < shuffleMergeWaitMinSizeThreshold) {
+        // Check if we can process map stage completion. If shuffle merge 
finalization
+        // is already triggered because push completion ratio was reached 
earlier,
+        // we cannot process map stage completion, but have to wait for the 
finalization
+        // to finish. This is because it's not straightforward to interrupt the
+        // finalization task and undo what it might have already done.
+        if (scheduleShuffleMergeFinalize(shuffleStage, delay = 0,
+          registerMergeResults = false)) {

Review comment:
       Ok, I got your point.
   
   I took another look at this part code. Seems like I misread one part. When 
`registerMergeResults=false`, it won't call `handleShuffleMergeFinalized` at 
the end (I thought it would before). So, it doesn't really call 
`handleShuffleMergeFinalized` in this case. 
   
   But shall we refactor `finalizeShuffleMerge` to make it be consistent for 
`registerMergeResults=true|false`? e.g.,
   ```scala
   ...
   val results = (0 until numMergers).map(_ => SettableFuture.create[Boolean]())
   if (!registerMergeResults) {
     results.foreach(_.set(true))
     // finalize in a separate thread
     ...
   } else {
     ...
   }
   try {
    Futures.allAsList(results: _*).get(shuffleMergeResultsTimeoutSec, 
TimeUnit.SECONDS)
   } catch {
     case _: TimeoutException =>
       logInfo(s"Timed out on waiting for merge results from all " +
        s"$numMergers mergers for shuffle $shuffleId")
   } finally {
     eventProcessLoop.post(ShuffleMergeFinalized(stage))
   }
   ...
   
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to