mridulm commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r958010820


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -282,13 +285,19 @@ private[spark] class DAGScheduler(
       None
     }
 
-  // Use multi-threaded scheduled executor. The merge finalization task could 
take some time,
-  // depending on the time to establish connections to mergers, and the time 
to get MergeStatuses
-  // from all the mergers.
+  // Use multi-threaded scheduled executor. The merge finalization task (per 
stage) takes up to
+  // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
   private val shuffleMergeFinalizeScheduler =
     ThreadUtils.newDaemonThreadPoolScheduledExecutor("shuffle-merge-finalizer",
       shuffleMergeFinalizeNumThreads)
 
+  // Send finalize RPC tasks to merger ESS, one thread per RPC and will be 
cancelled after
+  // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. Please close the opened files 
in the merger ESS
+  // if finalize RPC is not received due to network issues.
+  private val shuffleSendFinalizeRPCExecutor: ExecutorService =
+  ThreadUtils.newDaemonFixedThreadPool(
+    shuffleSendFinalizeRPCThreads, "send-shuffle-merge-finalize-rpc")

Review Comment:
   Fix indentation



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2242,60 +2251,57 @@ private[spark] class DAGScheduler(
     val numMergers = stage.shuffleDep.getMergerLocs.length
     val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
     externalShuffleClient.foreach { shuffleClient =>
-      if (!registerMergeResults) {
-        results.foreach(_.set(true))
-        // Finalize in separate thread as shuffle merge is a no-op in this case
-        shuffleMergeFinalizeScheduler.schedule(new Runnable {
-          override def run(): Unit = {
-            stage.shuffleDep.getMergerLocs.foreach {
-              case shuffleServiceLoc =>
-                // Sends async request to shuffle service to finalize shuffle 
merge on that host.
-                // Since merge statuses will not be registered in this case,
-                // we pass a no-op listener.
-                shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-                  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-                  new MergeFinalizerListener {
-                    override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
-                    }
+      val scheduledFutures = stage.shuffleDep.getMergerLocs.zipWithIndex.map {
+        case (shuffleServiceLoc, index) =>
+          // Sends async request to shuffle service to finalize shuffle merge 
on that host
+          // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
+          // TODO: during shuffleMergeFinalizeWaitSec
+          shuffleSendFinalizeRPCExecutor.submit(new Runnable() {
+            override def run(): Unit = {
+              shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+                shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+                new MergeFinalizerListener {
+                  override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
+                    assert(shuffleId == statuses.shuffleId)
+                    eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
+                      convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
+                    results(index).set(true)
+                  }

Review Comment:
   `onShuffleMergeSuccess` and `onShuffleMergeFailure` are no-op's when 
`registerMergeResults == false` 
   
   I would suggest to keep the code pretty much as-is, except with the 
introduction of calling `shuffleClient.finalizeShuffleMerge` in  
`shuffleSendFinalizeRPCExecutor`
   
   
   Something like:
   
   ```
   val scheduledFutures = {
     if (!registerMergeResults) {
       results.foreach(_.set(true))
       stage.shuffleDep.getMergerLocs.map {
         case shuffleServiceLoc =>
           shuffleSendFinalizeRPCExecutor.submit(new Runnable() {
             override def run(): Unit = {
                 // earlier code inside case shuffleServiceLoc =>
                 ...
             }
           });
       }
     } else {
       stage.shuffleDep.getMergerLocs.zipWithIndex.map {
         case (shuffleServiceLoc, index) =>
           shuffleSendFinalizeRPCExecutor.submit(new Runnable() {
             override def run(): Unit = {
               // earlier code inside case (shuffleServiceLoc, index) =>
               ...
             }
           });
     }
   }
   
   
   var timedOut = false
   try {
     Futures.allAsList(results: _*).get(shuffleMergeResultsTimeoutSec, 
TimeUnit.SECONDS)
   } catch {
     timedOut = true
     // earlier code ...
   } finally {
     if (timedOut || !registerMergeResults) {
       cancelFinalizeShuffleMergeFutures(scheduledFutures, 
         if (timedOut) 0 else shuffleMergeResultsTimeoutSec)
     }
     eventProcessLoop.post(ShuffleMergeFinalized(stage))
   }
   
   private def cancelFinalizeShuffleMergeFutures(futures: Seq[Future[_]], 
delayInSecs: Int): Unit = {
     def cancelFutures(): Unit = futures.foreach (_..cancel(true))
     if (delayInSecs > 0) {
       shuffleMergeFinalizeScheduler.schedule(new Runnable {
         override def run(): Unit = {
             cancelFutures()
         }
       }, delayInSecs, TimeUnit.SECONDS));
     } else {
       cancelFutures()
     }
   }
   
   
   ```



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2311,6 +2311,15 @@ package object config {
       .intConf
       .createWithDefault(3)

Review Comment:
   Perhaps bump this up from 3 as well to 6 or 8 (since now future cancel 
happens in that threadpool - assuming my proposed change below is fine) - with 
`PUSH_BASED_SHUFFLE_SEND_FINALIZE_RPC_THREADS` correspondingly increased.



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -282,13 +285,19 @@ private[spark] class DAGScheduler(
       None
     }
 
-  // Use multi-threaded scheduled executor. The merge finalization task could 
take some time,
-  // depending on the time to establish connections to mergers, and the time 
to get MergeStatuses
-  // from all the mergers.
+  // Use multi-threaded scheduled executor. The merge finalization task (per 
stage) takes up to
+  // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
   private val shuffleMergeFinalizeScheduler =
     ThreadUtils.newDaemonThreadPoolScheduledExecutor("shuffle-merge-finalizer",
       shuffleMergeFinalizeNumThreads)
 
+  // Send finalize RPC tasks to merger ESS, one thread per RPC and will be 
cancelled after
+  // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. Please close the opened files 
in the merger ESS
+  // if finalize RPC is not received due to network issues.
+  private val shuffleSendFinalizeRPCExecutor: ExecutorService =

Review Comment:
   `RPC` -> `Rpc`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to