mridulm commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r963274409


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -282,13 +285,18 @@ private[spark] class DAGScheduler(
       None
     }
 
-  // Use multi-threaded scheduled executor. The merge finalization task could 
take some time,
-  // depending on the time to establish connections to mergers, and the time 
to get MergeStatuses
-  // from all the mergers.
+  // When push-based shuffle is enabled, spark driver will submit a finalize 
task which will send
+  // a finalize rpc to each merger ESS after the shuffle map stage is 
complete. The merge
+  // finalization task takes up to PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.

Review Comment:
   ```suggestion
     // finalization takes up to PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2242,70 +2252,110 @@ private[spark] class DAGScheduler(
     val numMergers = stage.shuffleDep.getMergerLocs.length
     val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
     externalShuffleClient.foreach { shuffleClient =>
-      if (!registerMergeResults) {
-        results.foreach(_.set(true))
-        // Finalize in separate thread as shuffle merge is a no-op in this case
-        shuffleMergeFinalizeScheduler.schedule(new Runnable {
-          override def run(): Unit = {
-            stage.shuffleDep.getMergerLocs.foreach {
-              case shuffleServiceLoc =>
-                // Sends async request to shuffle service to finalize shuffle 
merge on that host.
-                // Since merge statuses will not be registered in this case,
-                // we pass a no-op listener.
-                shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-                  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-                  new MergeFinalizerListener {
-                    override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
-                    }
+      val scheduledFutures =
+        if (!registerMergeResults) {
+          results.foreach(_.set(true))
+          // Finalize in separate thread as shuffle merge is a no-op in this 
case
+          stage.shuffleDep.getMergerLocs.map {
+            case shuffleServiceLoc =>
+              // Sends async request to shuffle service to finalize shuffle 
merge on that host.
+              // Since merge statuses will not be registered in this case,
+              // we pass a no-op listener.
+              shuffleSendFinalizeRpcExecutor.submit(new Runnable() {
+                override def run(): Unit = {
+                  shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+                    shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+                    new MergeFinalizerListener {
+                      override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
+                      }
 
-                    override def onShuffleMergeFailure(e: Throwable): Unit = {
-                    }
-                  })
-            }
-          }
-        }, 0, TimeUnit.SECONDS)
-      } else {
-        stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
-          case (shuffleServiceLoc, index) =>
-            // Sends async request to shuffle service to finalize shuffle 
merge on that host
-            // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
-            // TODO: during shuffleMergeFinalizeWaitSec
-            shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-              shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-              new MergeFinalizerListener {
-                override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
-                  assert(shuffleId == statuses.shuffleId)
-                  eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
-                    convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
-                  results(index).set(true)
+                      override def onShuffleMergeFailure(e: Throwable): Unit = 
{
+                        if (e.isInstanceOf[IOException]) {
+                          logInfo(s"Failed to connect external shuffle service 
" +
+                            s"${shuffleServiceLoc.hostPort}")
+                          
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+                        }
+                      }
+                    })
                 }
+              })
+          }
+        } else {
+          stage.shuffleDep.getMergerLocs.zipWithIndex.map {
+            case (shuffleServiceLoc, index) =>
+              // Sends async request to shuffle service to finalize shuffle 
merge on that host
+              // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage 
is cancelled
+              // TODO: during shuffleMergeFinalizeWaitSec
+              shuffleSendFinalizeRpcExecutor.submit(new Runnable() {
+                override def run(): Unit = {
+                  shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+                    shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+                    new MergeFinalizerListener {
+                      override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
+                        assert(shuffleId == statuses.shuffleId)
+                        eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
+                          convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
+                        results(index).set(true)
+                      }
 
-                override def onShuffleMergeFailure(e: Throwable): Unit = {
-                  logWarning(s"Exception encountered when trying to finalize 
shuffle " +
-                    s"merge on ${shuffleServiceLoc.host} for shuffle 
$shuffleId", e)
-                  // Do not fail the future as this would cause dag scheduler 
to prematurely
-                  // give up on waiting for merge results from the remaining 
shuffle services
-                  // if one fails
-                  results(index).set(false)
+                      override def onShuffleMergeFailure(e: Throwable): Unit = 
{
+                        logWarning(s"Exception encountered when trying to 
finalize shuffle " +
+                          s"merge on ${shuffleServiceLoc.host} for shuffle 
$shuffleId", e)
+                        // Do not fail the future as this would cause dag 
scheduler to prematurely
+                        // give up on waiting for merge results from the 
remaining shuffle services
+                        // if one fails
+                        if (e.isInstanceOf[IOException]) {
+                          logInfo(s"Failed to connect external shuffle service 
" +
+                            s"${shuffleServiceLoc.hostPort}")
+                          
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+                          results(index).set(false)
+                        }
+                      }
+                    })
                 }
               })
+          }
         }
-      }
-      // DAGScheduler only waits for a limited amount of time for the merge 
results.
-      // It will attempt to submit the next stage(s) irrespective of whether 
merge results
-      // from all shuffle services are received or not.
+      var timedOut = false
       try {
         Futures.allAsList(results: _*).get(shuffleMergeResultsTimeoutSec, 
TimeUnit.SECONDS)
       } catch {
         case _: TimeoutException =>
+          timedOut = true
           logInfo(s"Timed out on waiting for merge results from all " +
             s"$numMergers mergers for shuffle $shuffleId")
       } finally {
+        if (timedOut || !registerMergeResults) {
+          cancelFinalizeShuffleMergeFutures(scheduledFutures,

Review Comment:
   @otterc, this applies when we are unable to send the finalization request to 
ESS - and so we start having more and more threads blocked on 
`shuffleClient.finalizeShuffleMerge` over time - preventing all other merge 
finalizations from getting submitted - as @wankunde illustrated above.
   In almost all cases, `shuffleClient.finalizeShuffleMerge` should finish very 
quickly - and so `cancelFinalizeShuffleMergeFutures` is effectively a noop.



##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -4440,6 +4440,39 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     assert(mapStatuses.count(s => s != null && s.location.executorId == 
"hostB-exec") === 1)
   }
 

Review Comment:
   Can we add a test specifically for the behavior we have introduced ? Namely, 
we send finalize shuffle even if sending msg to "first" merger blocks 
indefinitely (and so does not block a) other sends, b) merge finalization 
completes within the timeout).
   



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -282,13 +285,18 @@ private[spark] class DAGScheduler(
       None
     }
 
-  // Use multi-threaded scheduled executor. The merge finalization task could 
take some time,
-  // depending on the time to establish connections to mergers, and the time 
to get MergeStatuses
-  // from all the mergers.
+  // When push-based shuffle is enabled, spark driver will submit a finalize 
task which will send
+  // a finalize rpc to each merger ESS after the shuffle map stage is 
complete. The merge
+  // finalization task takes up to PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
   private val shuffleMergeFinalizeScheduler =
     ThreadUtils.newDaemonThreadPoolScheduledExecutor("shuffle-merge-finalizer",
       shuffleMergeFinalizeNumThreads)
 
+  // Send finalize RPC tasks to merger ESS, one thread per RPC and will be 
cancelled after
+  // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.

Review Comment:
   ```suggestion
     // Send finalize RPC tasks to merger ESS
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2242,70 +2250,110 @@ private[spark] class DAGScheduler(
     val numMergers = stage.shuffleDep.getMergerLocs.length
     val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
     externalShuffleClient.foreach { shuffleClient =>
-      if (!registerMergeResults) {
-        results.foreach(_.set(true))
-        // Finalize in separate thread as shuffle merge is a no-op in this case
-        shuffleMergeFinalizeScheduler.schedule(new Runnable {
-          override def run(): Unit = {
-            stage.shuffleDep.getMergerLocs.foreach {
-              case shuffleServiceLoc =>
-                // Sends async request to shuffle service to finalize shuffle 
merge on that host.
-                // Since merge statuses will not be registered in this case,
-                // we pass a no-op listener.
-                shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-                  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-                  new MergeFinalizerListener {
-                    override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
-                    }
+      val scheduledFutures =
+        if (!registerMergeResults) {
+          results.foreach(_.set(true))
+          // Finalize in separate thread as shuffle merge is a no-op in this 
case
+          stage.shuffleDep.getMergerLocs.map {
+            case shuffleServiceLoc =>
+              // Sends async request to shuffle service to finalize shuffle 
merge on that host.
+              // Since merge statuses will not be registered in this case,
+              // we pass a no-op listener.
+              shuffleSendFinalizeRpcExecutor.submit(new Runnable() {
+                override def run(): Unit = {
+                  shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+                    shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+                    new MergeFinalizerListener {
+                      override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
+                      }
 
-                    override def onShuffleMergeFailure(e: Throwable): Unit = {
-                    }
-                  })
-            }
-          }
-        }, 0, TimeUnit.SECONDS)
-      } else {
-        stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
-          case (shuffleServiceLoc, index) =>
-            // Sends async request to shuffle service to finalize shuffle 
merge on that host
-            // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
-            // TODO: during shuffleMergeFinalizeWaitSec
-            shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-              shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-              new MergeFinalizerListener {
-                override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
-                  assert(shuffleId == statuses.shuffleId)
-                  eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
-                    convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
-                  results(index).set(true)
+                      override def onShuffleMergeFailure(e: Throwable): Unit = 
{
+                        if (e.isInstanceOf[IOException]) {
+                          logInfo(s"Failed to connect external shuffle service 
" +
+                            s"${shuffleServiceLoc.hostPort}")
+                          
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+                        }
+                      }
+                    })
                 }
+              })
+          }
+        } else {
+          stage.shuffleDep.getMergerLocs.zipWithIndex.map {
+            case (shuffleServiceLoc, index) =>
+              // Sends async request to shuffle service to finalize shuffle 
merge on that host
+              // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage 
is cancelled
+              // TODO: during shuffleMergeFinalizeWaitSec
+              shuffleSendFinalizeRpcExecutor.submit(new Runnable() {
+                override def run(): Unit = {
+                  shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+                    shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+                    new MergeFinalizerListener {
+                      override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
+                        assert(shuffleId == statuses.shuffleId)
+                        eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
+                          convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
+                        results(index).set(true)
+                      }
 
-                override def onShuffleMergeFailure(e: Throwable): Unit = {
-                  logWarning(s"Exception encountered when trying to finalize 
shuffle " +
-                    s"merge on ${shuffleServiceLoc.host} for shuffle 
$shuffleId", e)
-                  // Do not fail the future as this would cause dag scheduler 
to prematurely
-                  // give up on waiting for merge results from the remaining 
shuffle services
-                  // if one fails
-                  results(index).set(false)
+                      override def onShuffleMergeFailure(e: Throwable): Unit = 
{
+                        logWarning(s"Exception encountered when trying to 
finalize shuffle " +
+                          s"merge on ${shuffleServiceLoc.host} for shuffle 
$shuffleId", e)
+                        // Do not fail the future as this would cause dag 
scheduler to prematurely
+                        // give up on waiting for merge results from the 
remaining shuffle services
+                        // if one fails
+                        if (e.isInstanceOf[IOException]) {
+                          logInfo(s"Failed to connect external shuffle service 
" +
+                            s"${shuffleServiceLoc.hostPort}")
+                          
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+                          results(index).set(false)
+                        }
+                      }
+                    })
                 }
               })
+          }
         }
-      }
-      // DAGScheduler only waits for a limited amount of time for the merge 
results.
-      // It will attempt to submit the next stage(s) irrespective of whether 
merge results
-      // from all shuffle services are received or not.
+      var timedOut = false
       try {
         Futures.allAsList(results: _*).get(shuffleMergeResultsTimeoutSec, 
TimeUnit.SECONDS)
       } catch {
         case _: TimeoutException =>
+          timedOut = true
           logInfo(s"Timed out on waiting for merge results from all " +
             s"$numMergers mergers for shuffle $shuffleId")
       } finally {
+        if (timedOut || !registerMergeResults) {
+          cancelFinalizeShuffleMergeFutures(scheduledFutures,
+            if (timedOut) 0L else shuffleMergeResultsTimeoutSec)
+        }
         eventProcessLoop.post(ShuffleMergeFinalized(stage))
       }
     }
   }
 
+  private def cancelFinalizeShuffleMergeFutures(
+      futures: Seq[JFutrue[_]],
+      delayInSecs: Long): Unit = {
+    def cancelFutures(): Unit = {
+      futures.map(future => {
+        if (!future.isDone) {
+          future.cancel(true)
+        }
+      })

Review Comment:
   ```suggestion
         futures.foreach(_.cancel(true))
   ```
   
   



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2309,7 +2309,17 @@ package object config {
         " shuffle is enabled.")
       .version("3.3.0")
       .intConf
-      .createWithDefault(3)
+      .createWithDefault(8)
+
+  private[spark] val PUSH_SHUFFLE_FINALIZE_RPC_THREADS =
+    ConfigBuilder("spark.shuffle.push.sendFinalizeRPCThreads")
+      .doc("Number of threads used by the driver to send finalize shuffle RPC 
to mergers" +
+        " location and then get MergeStatus. The thread won't stop" +
+        " PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. The merger ESS may open 
too many files" +
+        " if the finalize rpc is not received.")

Review Comment:
   ```suggestion
       ConfigBuilder("spark.shuffle.push.sendFinalizeRPCThreads")
         .doc("Number of threads used by the driver to send finalize shuffle 
RPC to mergers." +
           " External shuffle servers initiate merge finalization on receiving 
this request.")
   ```
   
   Also, mark it as `internal`



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2309,7 +2309,17 @@ package object config {
         " shuffle is enabled.")
       .version("3.3.0")
       .intConf
-      .createWithDefault(3)
+      .createWithDefault(8)
+
+  private[spark] val PUSH_SHUFFLE_FINALIZE_RPC_THREADS =
+    ConfigBuilder("spark.shuffle.push.sendFinalizeRPCThreads")
+      .doc("Number of threads used by the driver to send finalize shuffle RPC 
to mergers" +
+        " location and then get MergeStatus. The thread won't stop" +
+        " PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. The merger ESS may open 
too many files" +
+        " if the finalize rpc is not received.")
+      .version("3.3.0")

Review Comment:
   ```suggestion
         .version("3.4.0")
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2242,70 +2252,110 @@ private[spark] class DAGScheduler(
     val numMergers = stage.shuffleDep.getMergerLocs.length
     val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
     externalShuffleClient.foreach { shuffleClient =>
-      if (!registerMergeResults) {
-        results.foreach(_.set(true))
-        // Finalize in separate thread as shuffle merge is a no-op in this case
-        shuffleMergeFinalizeScheduler.schedule(new Runnable {
-          override def run(): Unit = {
-            stage.shuffleDep.getMergerLocs.foreach {
-              case shuffleServiceLoc =>
-                // Sends async request to shuffle service to finalize shuffle 
merge on that host.
-                // Since merge statuses will not be registered in this case,
-                // we pass a no-op listener.
-                shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-                  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-                  new MergeFinalizerListener {
-                    override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
-                    }
+      val scheduledFutures =
+        if (!registerMergeResults) {
+          results.foreach(_.set(true))
+          // Finalize in separate thread as shuffle merge is a no-op in this 
case
+          stage.shuffleDep.getMergerLocs.map {
+            case shuffleServiceLoc =>
+              // Sends async request to shuffle service to finalize shuffle 
merge on that host.
+              // Since merge statuses will not be registered in this case,
+              // we pass a no-op listener.
+              shuffleSendFinalizeRpcExecutor.submit(new Runnable() {
+                override def run(): Unit = {
+                  shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+                    shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+                    new MergeFinalizerListener {
+                      override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
+                      }
 
-                    override def onShuffleMergeFailure(e: Throwable): Unit = {
-                    }
-                  })
-            }
-          }
-        }, 0, TimeUnit.SECONDS)
-      } else {
-        stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
-          case (shuffleServiceLoc, index) =>
-            // Sends async request to shuffle service to finalize shuffle 
merge on that host
-            // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
-            // TODO: during shuffleMergeFinalizeWaitSec
-            shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-              shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-              new MergeFinalizerListener {
-                override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
-                  assert(shuffleId == statuses.shuffleId)
-                  eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
-                    convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
-                  results(index).set(true)
+                      override def onShuffleMergeFailure(e: Throwable): Unit = 
{
+                        if (e.isInstanceOf[IOException]) {
+                          logInfo(s"Failed to connect external shuffle service 
" +
+                            s"${shuffleServiceLoc.hostPort}")
+                          
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)

Review Comment:
   Agree with @otterc, let us revert this change.
   We can analyze the need for it as a follow up, if required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to