mridulm commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r960285819


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2242,60 +2251,57 @@ private[spark] class DAGScheduler(
     val numMergers = stage.shuffleDep.getMergerLocs.length
     val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
     externalShuffleClient.foreach { shuffleClient =>
-      if (!registerMergeResults) {
-        results.foreach(_.set(true))
-        // Finalize in separate thread as shuffle merge is a no-op in this case
-        shuffleMergeFinalizeScheduler.schedule(new Runnable {
-          override def run(): Unit = {
-            stage.shuffleDep.getMergerLocs.foreach {
-              case shuffleServiceLoc =>
-                // Sends async request to shuffle service to finalize shuffle 
merge on that host.
-                // Since merge statuses will not be registered in this case,
-                // we pass a no-op listener.
-                shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-                  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-                  new MergeFinalizerListener {
-                    override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
-                    }
+      val scheduledFutures = stage.shuffleDep.getMergerLocs.zipWithIndex.map {
+        case (shuffleServiceLoc, index) =>
+          // Sends async request to shuffle service to finalize shuffle merge 
on that host
+          // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
+          // TODO: during shuffleMergeFinalizeWaitSec
+          shuffleSendFinalizeRPCExecutor.submit(new Runnable() {
+            override def run(): Unit = {
+              shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+                shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+                new MergeFinalizerListener {
+                  override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
+                    assert(shuffleId == statuses.shuffleId)
+                    eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
+                      convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
+                    results(index).set(true)
+                  }

Review Comment:
   > I have a confusion about the origin code. If there is small shuffle data, 
registerMergeResults will be false, so we don't wait for the merged statuses, 
but these merged statuses are still useful if they are available before the 
reduce tasks fetch them? This should often happen if the cluster is heavy.
   
   This is to balance the overhead of waiting for finalization to complete - so 
that really small stages dont spend most of their time waiting for finalization 
to complete (so that we minimize the overheads of finalization as the benefits 
at read wont be sufficient).
   You can tune `spark.shuffle.push.minCompletedPushRatio` and 
`spark.shuffle.push.minShuffleSizeToWait` as appropriate to your cluster env 
for modifying the behavior.
   
   > Another question, for your suggestion code, if all the finalize RPCs can 
be completed within shuffleMergeResultsTimeoutSec, does the 
cancelFinalizeShuffleMergeFutures task still need to wait for scheduling ?
   
   In the snippet I pasted above, we have two conditions:
   `timedOut || !registerMergeResults`
   
   If `timedOut == true`, not all `results` were obtained - now this would 
(almost always) mean the merger received finalization message but did not 
respond in time, or the finalize message never reached it (the issue we 
observed in this PR).
   
   If `registerMergeResults == false`, we have not had time to send any rpc (we 
just enqueued the tasks, set all `results` to true - so have not waited in the 
`Futures.allAsList`) - so we do need to wait for rpc's to complete.
   
   Note that if the task has already completed, cancelling futures is really 
cheap.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to