Victsm commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r647743143



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2000,6 +2023,133 @@ private[spark] class DAGScheduler(
     }
   }
 
+  /**
+   * Schedules shuffle merge finalize.
+   */
+  private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): 
Unit = {
+    logInfo(("%s (%s) scheduled for finalizing" +
+      " shuffle merge in %s s").format(stage, stage.name, 
shuffleMergeFinalizeWaitSec))
+    shuffleMergeFinalizeScheduler.schedule(
+      new Runnable {
+        override def run(): Unit = finalizeShuffleMerge(stage)
+      },
+      shuffleMergeFinalizeWaitSec,
+      TimeUnit.SECONDS
+    )
+  }
+
+  /**
+   * DAGScheduler notifies all the remote shuffle services chosen to serve 
shuffle merge request for
+   * the given shuffle map stage to finalize the shuffle merge process for 
this shuffle. This is
+   * invoked in a separate thread to reduce the impact on the DAGScheduler 
main thread, as the
+   * scheduler might need to talk to 1000s of shuffle services to finalize 
shuffle merge.
+   */
+  private[scheduler] def finalizeShuffleMerge(stage: ShuffleMapStage): Unit = {
+    logInfo("%s (%s) finalizing the shuffle merge".format(stage, stage.name))
+    externalShuffleClient.foreach { shuffleClient =>
+      val shuffleId = stage.shuffleDep.shuffleId
+      val numMergers = stage.shuffleDep.getMergerLocs.length
+      val numResponses = new AtomicInteger()
+      val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
+
+      stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
+        case (shuffleServiceLoc, index) =>
+          // Sends async request to shuffle service to finalize shuffle merge 
on that host
+          // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
+          // TODO: during shuffleMergeFinalizeWaitSec
+          shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+            shuffleServiceLoc.port, shuffleId,
+            new MergeFinalizerListener {
+              override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
+                assert(shuffleId == statuses.shuffleId)
+                eventProcessLoop.post(RegisterMergeStatuses(stage, MergeStatus.
+                  convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
+                numResponses.incrementAndGet()
+                results(index).set(true)
+              }
+
+              override def onShuffleMergeFailure(e: Throwable): Unit = {
+                logWarning(s"Exception encountered when trying to finalize 
shuffle " +
+                  s"merge on ${shuffleServiceLoc.host} for shuffle 
$shuffleId", e)
+                numResponses.incrementAndGet()
+                // Do not fail the future as this would cause dag scheduler to 
prematurely
+                // give up on waiting for merge results from the remaining 
shuffle services
+                // if one fails
+                results(index).set(false)
+              }
+            })
+      }
+      // DAGScheduler only waits for a limited amount of time for the merge 
results.
+      // It will attempt to submit the next stage(s) irrespective of whether 
merge results
+      // from all shuffle services are received or not.
+      // TODO: SPARK-33701: Instead of waiting for a constant amount of time 
for finalization
+      // TODO: for all the stages, adaptively tune timeout for merge 
finalization

Review comment:
       Nit: SPARK-33701 is not meant for solving the wait time here, which is 
already adaptive. It's meant for the wait time in 
`scheduleShuffleMergeFinalize`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to