[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-09-20 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r975069710


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -2309,7 +2309,18 @@ package object config {
 " shuffle is enabled.")
   .version("3.3.0")
   .intConf
-  .createWithDefault(3)
+  .createWithDefault(8)
+
+  private[spark] val PUSH_SHUFFLE_FINALIZE_RPC_THREADS =
+ConfigBuilder("spark.shuffle.push.sendFinalizeRPCThreads")
+  .internal()
+  .doc("Number of threads used by the driver to send finalize shuffle RPC 
to mergers" +
+" location and then get MergeStatus. The thread won't stop" +
+" PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. The merger ESS may open 
too many files" +

Review Comment:
   I'm sorry, update the doc to `The thread will run for up to 
PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-09-18 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r973869591


##
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##
@@ -4443,36 +4443,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 assert(mapStatuses.count(s => s != null && s.location.executorId == 
"hostB-exec") === 1)
   }
 
-  test("SPARK-40096: Send finalize events even if shuffle merger blocks 
indefinitely") {
+  test("SPARK-40096: Send finalize events even if shuffle merger blocks 
indefinitely " +
+"with registerMergeResults is true") {
 initPushBasedShuffleConfs(conf)
 
+sc.conf.set("spark.shuffle.push.results.timeout", "1s")
+val myScheduler = new MyDAGScheduler(
+  sc,
+  taskScheduler,
+  sc.listenerBus,
+  mapOutputTracker,
+  blockManagerMaster,
+  sc.env,
+  shuffleMergeFinalize = false)
+
+val mergerLocs = Seq(makeBlockManagerId("hostA"), 
makeBlockManagerId("hostB"))
+val timeoutSecs = 1
+val sendRequestsLatch = new CountDownLatch(mergerLocs.size)
+val completeLatch = new CountDownLatch(mergerLocs.size)
+val canSendRequestLatch = new CountDownLatch(1)
+
 val blockStoreClient = mock(classOf[ExternalBlockStoreClient])
 val blockStoreClientField = 
classOf[BlockManager].getDeclaredField("blockStoreClient")
 blockStoreClientField.setAccessible(true)
 blockStoreClientField.set(sc.env.blockManager, blockStoreClient)
+
 val sentHosts = ArrayBuffer[String]()
+var hostAInterrupted = false
 doAnswer { (invoke: InvocationOnMock) =>
   val host = invoke.getArgument[String](0)
-  sentHosts += host
-  // Block FinalizeShuffleMerge rpc for 2 seconds
-  if (invoke.getArgument[String](0) == "hostA") {
-Thread.sleep(2000)
+  sendRequestsLatch.countDown()
+  try {
+if (host == "hostA") {
+  canSendRequestLatch.await(timeoutSecs * 2, TimeUnit.SECONDS)
+}
+sentHosts += host
+  } catch {
+case _: InterruptedException => hostAInterrupted = true
+  } finally {
+completeLatch.countDown()
   }
 }.when(blockStoreClient).finalizeShuffleMerge(any(), any(), any(), any(), 
any())
 
 val shuffleMapRdd = new MyRDD(sc, 1, Nil)
 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(2))
-shuffleDep.setMergerLocs(Seq(makeBlockManagerId("hostA"), 
makeBlockManagerId("hostB")))
-val shuffleStage = scheduler.createShuffleMapStage(shuffleDep, 0)
-
-Seq(true, false).foreach { registerMergeResults =>
-  sentHosts.clear()
-  scheduler.finalizeShuffleMerge(shuffleStage, registerMergeResults)
-  verify(blockStoreClient, times(2))
-.finalizeShuffleMerge(any(), any(), any(), any(), any())
-  assert((sentHosts diff Seq("hostA", "hostB")).isEmpty)
-  reset(blockStoreClient)
-}
+shuffleDep.setMergerLocs(mergerLocs)
+val shuffleStage = myScheduler.createShuffleMapStage(shuffleDep, 0)
+
+myScheduler.finalizeShuffleMerge(shuffleStage, true)
+sendRequestsLatch.await()
+verify(blockStoreClient, times(2))
+  .finalizeShuffleMerge(any(), any(), any(), any(), any())
+assert(sentHosts === Seq("hostB"))
+completeLatch.await()
+assert(hostAInterrupted)
+  }
+
+  test("SPARK-40096: Send finalize events even if shuffle merger blocks 
indefinitely " +
+"with registerMergeResults is false") {

Review Comment:
   Thanks @mridulm I have merged UTs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-09-12 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r969118491


##
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##
@@ -4440,6 +4443,37 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 assert(mapStatuses.count(s => s != null && s.location.executorId == 
"hostB-exec") === 1)
   }
 
+  test("SPARK-40096: Send finalize events even if shuffle merger blocks 
indefinitely") {
+initPushBasedShuffleConfs(conf)
+
+val blockStoreClient = mock(classOf[ExternalBlockStoreClient])
+val blockStoreClientField = 
classOf[BlockManager].getDeclaredField("blockStoreClient")
+blockStoreClientField.setAccessible(true)
+blockStoreClientField.set(sc.env.blockManager, blockStoreClient)
+val sentHosts = ArrayBuffer[String]()
+doAnswer { (invoke: InvocationOnMock) =>
+  val host = invoke.getArgument[String](0)
+  sentHosts += host
+  // Block FinalizeShuffleMerge rpc for 2 seconds
+  if (invoke.getArgument[String](0) == "hostA") {
+Thread.sleep(2000)
+  }
+}.when(blockStoreClient).finalizeShuffleMerge(any(), any(), any(), any(), 
any())
+
+val shuffleMapRdd = new MyRDD(sc, 1, Nil)
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(2))
+shuffleDep.setMergerLocs(Seq(makeBlockManagerId("hostA"), 
makeBlockManagerId("hostB")))
+val shuffleStage = scheduler.createShuffleMapStage(shuffleDep, 0)
+
+Seq(true, false).foreach { registerMergeResults =>
+  sentHosts.clear()
+  scheduler.finalizeShuffleMerge(shuffleStage, registerMergeResults)
+  verify(blockStoreClient, times(2))
+.finalizeShuffleMerge(any(), any(), any(), any(), any())
+  assert((sentHosts diff Seq("hostA", "hostB")).isEmpty)
+  reset(blockStoreClient)
+}
+  }

Review Comment:
   Update UT, @mridulm could you help to review it again? Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-09-12 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r969117695


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -2242,70 +2249,94 @@ private[spark] class DAGScheduler(
 val numMergers = stage.shuffleDep.getMergerLocs.length
 val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
 externalShuffleClient.foreach { shuffleClient =>
-  if (!registerMergeResults) {
-results.foreach(_.set(true))
-// Finalize in separate thread as shuffle merge is a no-op in this case
-shuffleMergeFinalizeScheduler.schedule(new Runnable {
-  override def run(): Unit = {
-stage.shuffleDep.getMergerLocs.foreach {
-  case shuffleServiceLoc =>
-// Sends async request to shuffle service to finalize shuffle 
merge on that host.
-// Since merge statuses will not be registered in this case,
-// we pass a no-op listener.
-shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-  new MergeFinalizerListener {
-override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
-}
+  val scheduledFutures =
+if (!registerMergeResults) {
+  results.foreach(_.set(true))
+  // Finalize in separate thread as shuffle merge is a no-op in this 
case
+  stage.shuffleDep.getMergerLocs.map {
+case shuffleServiceLoc =>
+  // Sends async request to shuffle service to finalize shuffle 
merge on that host.
+  // Since merge statuses will not be registered in this case,
+  // we pass a no-op listener.
+  shuffleSendFinalizeRpcExecutor.submit(new Runnable() {
+override def run(): Unit = {
+  shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+new MergeFinalizerListener {
+  override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
+  }
 
-override def onShuffleMergeFailure(e: Throwable): Unit = {
-}
-  })
-}
-  }
-}, 0, TimeUnit.SECONDS)
-  } else {
-stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
-  case (shuffleServiceLoc, index) =>
-// Sends async request to shuffle service to finalize shuffle 
merge on that host
-// TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
-// TODO: during shuffleMergeFinalizeWaitSec
-shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-  new MergeFinalizerListener {
-override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
-  assert(shuffleId == statuses.shuffleId)
-  eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
-convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
-  results(index).set(true)
+  override def onShuffleMergeFailure(e: Throwable): Unit = 
{
+  }
+})
 }
+  })
+  }
+} else {
+  stage.shuffleDep.getMergerLocs.zipWithIndex.map {
+case (shuffleServiceLoc, index) =>
+  // Sends async request to shuffle service to finalize shuffle 
merge on that host
+  // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage 
is cancelled
+  // TODO: during shuffleMergeFinalizeWaitSec
+  shuffleSendFinalizeRpcExecutor.submit(new Runnable() {
+override def run(): Unit = {
+  shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+new MergeFinalizerListener {
+  override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
+assert(shuffleId == statuses.shuffleId)
+eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
+  convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
+results(index).set(true)
+  }
 
-override def onShuffleMergeFailure(e: Throwable): Unit = {
-  logWarning(s"Exception encountered when trying to finalize 
shuffle " +
-s"merge on ${shuffleServiceLoc.host} for shuffle 
$shuffleId", e)
-  // Do not fail the future as this would cause dag 

[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-09-03 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r962112740


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -2242,70 +2252,110 @@ private[spark] class DAGScheduler(
 val numMergers = stage.shuffleDep.getMergerLocs.length
 val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
 externalShuffleClient.foreach { shuffleClient =>
-  if (!registerMergeResults) {
-results.foreach(_.set(true))
-// Finalize in separate thread as shuffle merge is a no-op in this case
-shuffleMergeFinalizeScheduler.schedule(new Runnable {
-  override def run(): Unit = {
-stage.shuffleDep.getMergerLocs.foreach {
-  case shuffleServiceLoc =>
-// Sends async request to shuffle service to finalize shuffle 
merge on that host.
-// Since merge statuses will not be registered in this case,
-// we pass a no-op listener.
-shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-  new MergeFinalizerListener {
-override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
-}
+  val scheduledFutures =
+if (!registerMergeResults) {
+  results.foreach(_.set(true))
+  // Finalize in separate thread as shuffle merge is a no-op in this 
case
+  stage.shuffleDep.getMergerLocs.map {
+case shuffleServiceLoc =>
+  // Sends async request to shuffle service to finalize shuffle 
merge on that host.
+  // Since merge statuses will not be registered in this case,
+  // we pass a no-op listener.
+  shuffleSendFinalizeRpcExecutor.submit(new Runnable() {
+override def run(): Unit = {
+  shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+new MergeFinalizerListener {
+  override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
+  }
 
-override def onShuffleMergeFailure(e: Throwable): Unit = {
-}
-  })
-}
-  }
-}, 0, TimeUnit.SECONDS)
-  } else {
-stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
-  case (shuffleServiceLoc, index) =>
-// Sends async request to shuffle service to finalize shuffle 
merge on that host
-// TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
-// TODO: during shuffleMergeFinalizeWaitSec
-shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-  new MergeFinalizerListener {
-override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
-  assert(shuffleId == statuses.shuffleId)
-  eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
-convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
-  results(index).set(true)
+  override def onShuffleMergeFailure(e: Throwable): Unit = 
{
+if (e.isInstanceOf[IOException]) {
+  logInfo(s"Failed to connect external shuffle service 
" +
+s"${shuffleServiceLoc.hostPort}")
+  
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+}
+  }
+})
 }
+  })
+  }
+} else {
+  stage.shuffleDep.getMergerLocs.zipWithIndex.map {
+case (shuffleServiceLoc, index) =>
+  // Sends async request to shuffle service to finalize shuffle 
merge on that host
+  // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage 
is cancelled
+  // TODO: during shuffleMergeFinalizeWaitSec
+  shuffleSendFinalizeRpcExecutor.submit(new Runnable() {
+override def run(): Unit = {
+  shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+new MergeFinalizerListener {
+  override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
+assert(shuffleId == statuses.shuffleId)
+eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
+  convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
+results(index).set(true)
+  

[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-09-03 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r962111737


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -2242,70 +2252,110 @@ private[spark] class DAGScheduler(
 val numMergers = stage.shuffleDep.getMergerLocs.length
 val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
 externalShuffleClient.foreach { shuffleClient =>
-  if (!registerMergeResults) {
-results.foreach(_.set(true))
-// Finalize in separate thread as shuffle merge is a no-op in this case
-shuffleMergeFinalizeScheduler.schedule(new Runnable {
-  override def run(): Unit = {
-stage.shuffleDep.getMergerLocs.foreach {
-  case shuffleServiceLoc =>
-// Sends async request to shuffle service to finalize shuffle 
merge on that host.
-// Since merge statuses will not be registered in this case,
-// we pass a no-op listener.
-shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-  new MergeFinalizerListener {
-override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
-}
+  val scheduledFutures =
+if (!registerMergeResults) {
+  results.foreach(_.set(true))
+  // Finalize in separate thread as shuffle merge is a no-op in this 
case
+  stage.shuffleDep.getMergerLocs.map {
+case shuffleServiceLoc =>
+  // Sends async request to shuffle service to finalize shuffle 
merge on that host.
+  // Since merge statuses will not be registered in this case,
+  // we pass a no-op listener.
+  shuffleSendFinalizeRpcExecutor.submit(new Runnable() {
+override def run(): Unit = {
+  shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+new MergeFinalizerListener {
+  override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
+  }
 
-override def onShuffleMergeFailure(e: Throwable): Unit = {
-}
-  })
-}
-  }
-}, 0, TimeUnit.SECONDS)
-  } else {
-stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
-  case (shuffleServiceLoc, index) =>
-// Sends async request to shuffle service to finalize shuffle 
merge on that host
-// TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
-// TODO: during shuffleMergeFinalizeWaitSec
-shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-  new MergeFinalizerListener {
-override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
-  assert(shuffleId == statuses.shuffleId)
-  eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
-convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
-  results(index).set(true)
+  override def onShuffleMergeFailure(e: Throwable): Unit = 
{
+if (e.isInstanceOf[IOException]) {
+  logInfo(s"Failed to connect external shuffle service 
" +
+s"${shuffleServiceLoc.hostPort}")
+  
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)

Review Comment:
   Because we don't want those nodes that failed to connect to be selected as 
merge locations again. 
   The merge locations come from two parts, the active block managers and the 
shuffle services that current app registered an executor in the past. We will 
remove node that failed to connect from the second part.
   So if the merge ESS is still active, it will still can be selected as merge 
location as it is in the first part. Or it will not be selected as merge 
location if it is down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-09-03 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r962110338


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -282,13 +286,19 @@ private[spark] class DAGScheduler(
   None
 }
 
-  // Use multi-threaded scheduled executor. The merge finalization task could 
take some time,
-  // depending on the time to establish connections to mergers, and the time 
to get MergeStatuses
-  // from all the mergers.
+  // Use multi-threaded scheduled executor. The merge finalization task (per 
stage) takes up to
+  // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
   private val shuffleMergeFinalizeScheduler =
 ThreadUtils.newDaemonThreadPoolScheduledExecutor("shuffle-merge-finalizer",
   shuffleMergeFinalizeNumThreads)
 
+  // Send finalize RPC tasks to merger ESS, one thread per RPC and will be 
cancelled after
+  // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. Please close the opened files 
in the merger ESS

Review Comment:
   Remove this comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-09-03 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r962110217


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -2309,7 +2309,16 @@ package object config {
 " shuffle is enabled.")
   .version("3.3.0")
   .intConf
-  .createWithDefault(3)
+  .createWithDefault(8)
+
+  private[spark] val PUSH_BASED_SHUFFLE_SEND_FINALIZE_RPC_THREADS =
+ConfigBuilder("spark.shuffle.push.sendFinalizeRPCThreads")
+  .doc("Number of threads used by driver to send finalize shuffle RPC to 
the merger" +

Review Comment:
   Updated



##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -2309,7 +2309,16 @@ package object config {
 " shuffle is enabled.")
   .version("3.3.0")
   .intConf
-  .createWithDefault(3)
+  .createWithDefault(8)
+
+  private[spark] val PUSH_BASED_SHUFFLE_SEND_FINALIZE_RPC_THREADS =

Review Comment:
   Updated



##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -273,6 +274,9 @@ private[spark] class DAGScheduler(
   private val shuffleMergeFinalizeNumThreads =
 sc.getConf.get(config.PUSH_BASED_SHUFFLE_MERGE_FINALIZE_THREADS)
 
+  private val shuffleSendFinalizeRpcThreads =

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-09-03 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r962110234


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -282,13 +286,19 @@ private[spark] class DAGScheduler(
   None
 }
 
-  // Use multi-threaded scheduled executor. The merge finalization task could 
take some time,
-  // depending on the time to establish connections to mergers, and the time 
to get MergeStatuses
-  // from all the mergers.
+  // Use multi-threaded scheduled executor. The merge finalization task (per 
stage) takes up to
+  // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-09-01 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r960777861


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -2242,60 +2251,57 @@ private[spark] class DAGScheduler(
 val numMergers = stage.shuffleDep.getMergerLocs.length
 val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
 externalShuffleClient.foreach { shuffleClient =>
-  if (!registerMergeResults) {
-results.foreach(_.set(true))
-// Finalize in separate thread as shuffle merge is a no-op in this case
-shuffleMergeFinalizeScheduler.schedule(new Runnable {
-  override def run(): Unit = {
-stage.shuffleDep.getMergerLocs.foreach {
-  case shuffleServiceLoc =>
-// Sends async request to shuffle service to finalize shuffle 
merge on that host.
-// Since merge statuses will not be registered in this case,
-// we pass a no-op listener.
-shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-  new MergeFinalizerListener {
-override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
-}
+  val scheduledFutures = stage.shuffleDep.getMergerLocs.zipWithIndex.map {
+case (shuffleServiceLoc, index) =>
+  // Sends async request to shuffle service to finalize shuffle merge 
on that host
+  // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
+  // TODO: during shuffleMergeFinalizeWaitSec
+  shuffleSendFinalizeRPCExecutor.submit(new Runnable() {
+override def run(): Unit = {
+  shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+new MergeFinalizerListener {
+  override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
+assert(shuffleId == statuses.shuffleId)
+eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
+  convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
+results(index).set(true)
+  }

Review Comment:
   Thanks for your explanation,  I have updated the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-08-31 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r959685431


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -2242,60 +2251,57 @@ private[spark] class DAGScheduler(
 val numMergers = stage.shuffleDep.getMergerLocs.length
 val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
 externalShuffleClient.foreach { shuffleClient =>
-  if (!registerMergeResults) {
-results.foreach(_.set(true))
-// Finalize in separate thread as shuffle merge is a no-op in this case
-shuffleMergeFinalizeScheduler.schedule(new Runnable {
-  override def run(): Unit = {
-stage.shuffleDep.getMergerLocs.foreach {
-  case shuffleServiceLoc =>
-// Sends async request to shuffle service to finalize shuffle 
merge on that host.
-// Since merge statuses will not be registered in this case,
-// we pass a no-op listener.
-shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-  new MergeFinalizerListener {
-override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
-}
+  val scheduledFutures = stage.shuffleDep.getMergerLocs.zipWithIndex.map {
+case (shuffleServiceLoc, index) =>
+  // Sends async request to shuffle service to finalize shuffle merge 
on that host
+  // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
+  // TODO: during shuffleMergeFinalizeWaitSec
+  shuffleSendFinalizeRpcExecutor.submit(new Runnable() {
+override def run(): Unit = {
+  shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+new MergeFinalizerListener {
+  override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
+assert(shuffleId == statuses.shuffleId)
+eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
+  convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
+results(index).set(true)
+  }
 
-override def onShuffleMergeFailure(e: Throwable): Unit = {
+  override def onShuffleMergeFailure(e: Throwable): Unit = {
+logWarning(s"Exception encountered when trying to finalize 
shuffle " +
+  s"merge on ${shuffleServiceLoc.host} for shuffle 
$shuffleId", e)
+// Do not fail the future as this would cause dag 
scheduler to prematurely
+// give up on waiting for merge results from the remaining 
shuffle services
+// if one fails
+if (e.isInstanceOf[IOException]) {
+  logInfo(s"Failed to connect external shuffle service " +
+s"${shuffleServiceLoc.hostPort}")
+  
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+  results(index).set(false)
 }
-  })
+  }
+})
 }
-  }
-}, 0, TimeUnit.SECONDS)
-  } else {
-stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
-  case (shuffleServiceLoc, index) =>
-// Sends async request to shuffle service to finalize shuffle 
merge on that host
-// TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
-// TODO: during shuffleMergeFinalizeWaitSec
-shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-  new MergeFinalizerListener {
-override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
-  assert(shuffleId == statuses.shuffleId)
-  eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
-convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
-  results(index).set(true)
-}
-
-override def onShuffleMergeFailure(e: Throwable): Unit = {
-  logWarning(s"Exception encountered when trying to finalize 
shuffle " +
-s"merge on ${shuffleServiceLoc.host} for shuffle 
$shuffleId", e)
-  // Do not fail the future as this would cause dag scheduler 
to prematurely
-  // give up on waiting for merge results from the remaining 
shuffle services
-  // if one fails
-  results(index).set(false)
-}
-  })
-}
+  })
   }
-

[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-08-31 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r959664178


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -2242,60 +2251,57 @@ private[spark] class DAGScheduler(
 val numMergers = stage.shuffleDep.getMergerLocs.length
 val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
 externalShuffleClient.foreach { shuffleClient =>
-  if (!registerMergeResults) {
-results.foreach(_.set(true))
-// Finalize in separate thread as shuffle merge is a no-op in this case
-shuffleMergeFinalizeScheduler.schedule(new Runnable {
-  override def run(): Unit = {
-stage.shuffleDep.getMergerLocs.foreach {
-  case shuffleServiceLoc =>
-// Sends async request to shuffle service to finalize shuffle 
merge on that host.
-// Since merge statuses will not be registered in this case,
-// we pass a no-op listener.
-shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-  new MergeFinalizerListener {
-override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
-}
+  val scheduledFutures = stage.shuffleDep.getMergerLocs.zipWithIndex.map {
+case (shuffleServiceLoc, index) =>
+  // Sends async request to shuffle service to finalize shuffle merge 
on that host
+  // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
+  // TODO: during shuffleMergeFinalizeWaitSec
+  shuffleSendFinalizeRPCExecutor.submit(new Runnable() {
+override def run(): Unit = {
+  shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+new MergeFinalizerListener {
+  override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
+assert(shuffleId == statuses.shuffleId)
+eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
+  convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
+results(index).set(true)
+  }

Review Comment:
   Much thanks for your suggestion.
   I have a confusion about the origin code. If there is small shuffle data, 
registerMergeResults will be false, so we don't wait for the merged statuses, 
but these merged statuses are still useful if they are available before the 
reduce tasks fetch them? This should often happen if the cluster is heavy.
   
   Another question, for your suggestion code, if all the finalize RPCs can be 
completed within `shuffleMergeResultsTimeoutSec`, does the 
`cancelFinalizeShuffleMergeFutures` task still need to wait for scheduling ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-08-28 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r956839303


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -2259,37 +2259,51 @@ private[spark] class DAGScheduler(
 }
 
 override def onShuffleMergeFailure(e: Throwable): Unit = {
+  if (e.isInstanceOf[IOException]) {
+logInfo(s"Failed to connect external shuffle service " 
+
+  s"${shuffleServiceLoc.hostPort}")
+
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+  }
 }
   })
 }
   }
 }, 0, TimeUnit.SECONDS)
   } else {
-stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
-  case (shuffleServiceLoc, index) =>
-// Sends async request to shuffle service to finalize shuffle 
merge on that host
-// TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
-// TODO: during shuffleMergeFinalizeWaitSec
-shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-  new MergeFinalizerListener {
-override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
-  assert(shuffleId == statuses.shuffleId)
-  eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
-convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
-  results(index).set(true)
-}
+shuffleMergeFinalizeScheduler.schedule(new Runnable {

Review Comment:
   Another idea, could we stop sending finalize RPCs after 
`shuffleMergeResultsTimeoutSec`. And start a thread in ESS that periodically 
checks and closes the opened files which the shuffle has already finalized?
   Do that in another PR?
   What do you think? @mridulm @otterc 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-08-26 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r955858394


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -2259,37 +2259,51 @@ private[spark] class DAGScheduler(
 }
 
 override def onShuffleMergeFailure(e: Throwable): Unit = {
+  if (e.isInstanceOf[IOException]) {
+logInfo(s"Failed to connect external shuffle service " 
+
+  s"${shuffleServiceLoc.hostPort}")
+
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+  }
 }
   })
 }
   }
 }, 0, TimeUnit.SECONDS)
   } else {
-stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
-  case (shuffleServiceLoc, index) =>
-// Sends async request to shuffle service to finalize shuffle 
merge on that host
-// TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
-// TODO: during shuffleMergeFinalizeWaitSec
-shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-  new MergeFinalizerListener {
-override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
-  assert(shuffleId == statuses.shuffleId)
-  eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
-convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
-  results(index).set(true)
-}
+shuffleMergeFinalizeScheduler.schedule(new Runnable {

Review Comment:
   For a long-running spark application, if no finalize partition RPC is 
received, ess will not close open data files, meta files, and index files until 
the application is complete. Too many opening files can be a potential risk. 
   
   Or should we add a parameter to control whether to continue sending finalize 
RPC to the remaining nodes ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-08-25 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r955564176


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -282,13 +286,19 @@ private[spark] class DAGScheduler(
   None
 }
 
-  // Use multi-threaded scheduled executor. The merge finalization task could 
take some time,
-  // depending on the time to establish connections to mergers, and the time 
to get MergeStatuses
-  // from all the mergers.
+  // Use multi-threaded scheduled executor. The merge finalization task (per 
stage) takes up to
+  // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
   private val shuffleMergeFinalizeScheduler =
 ThreadUtils.newDaemonThreadPoolScheduledExecutor("shuffle-merge-finalizer",
   shuffleMergeFinalizeNumThreads)
 
+  // The merge finalization task (per stage) will submit a asynchronous thread 
to send finalize
+  // RPC to the merger location and then get MergeStatus from the merger. This 
thread won't stop
+  // after PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
+  private val shuffleSendFinalizeRPCContext =
+ExecutionContext.fromExecutor(ThreadUtils.newDaemonFixedThreadPool(
+  shuffleSendFinalizeRPCThreads, "send-shuffle-merge-finalize-rpc"))

Review Comment:
   Because I think the number of send RPC tasks and total elapsed time in 
`shuffleSendFinalizeRPCContext` should bigger than tasks in 
`shuffleMergeFinalizeScheduler`.  



##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -282,13 +286,19 @@ private[spark] class DAGScheduler(
   None
 }
 
-  // Use multi-threaded scheduled executor. The merge finalization task could 
take some time,
-  // depending on the time to establish connections to mergers, and the time 
to get MergeStatuses
-  // from all the mergers.
+  // Use multi-threaded scheduled executor. The merge finalization task (per 
stage) takes up to
+  // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
   private val shuffleMergeFinalizeScheduler =
 ThreadUtils.newDaemonThreadPoolScheduledExecutor("shuffle-merge-finalizer",
   shuffleMergeFinalizeNumThreads)
 
+  // The merge finalization task (per stage) will submit a asynchronous thread 
to send finalize
+  // RPC to the merger location and then get MergeStatus from the merger. This 
thread won't stop
+  // after PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
+  private val shuffleSendFinalizeRPCContext =
+ExecutionContext.fromExecutor(ThreadUtils.newDaemonFixedThreadPool(
+  shuffleSendFinalizeRPCThreads, "send-shuffle-merge-finalize-rpc"))

Review Comment:
   Because I think the number of send RPC tasks and total elapsed time in 
`shuffleSendFinalizeRPCContext` should bigger than tasks in 
`shuffleMergeFinalizeScheduler`.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-08-25 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r954566063


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -282,13 +286,19 @@ private[spark] class DAGScheduler(
   None
 }
 
-  // Use multi-threaded scheduled executor. The merge finalization task could 
take some time,
-  // depending on the time to establish connections to mergers, and the time 
to get MergeStatuses
-  // from all the mergers.
+  // Use multi-threaded scheduled executor. The merge finalization task (per 
stage) takes up to
+  // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
   private val shuffleMergeFinalizeScheduler =
 ThreadUtils.newDaemonThreadPoolScheduledExecutor("shuffle-merge-finalizer",
   shuffleMergeFinalizeNumThreads)
 
+  // The merge finalization task (per stage) will submit a asynchronous thread 
to send finalize
+  // RPC to the merger location and then get MergeStatus from the merger. This 
thread won't stop
+  // after PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
+  private val shuffleSendFinalizeRPCContext =
+ExecutionContext.fromExecutor(ThreadUtils.newDaemonFixedThreadPool(
+  shuffleSendFinalizeRPCThreads, "send-shuffle-merge-finalize-rpc"))

Review Comment:
   According to @mridulm comments, we will submit a task for each finalize RPC 
to one merger node. If one task tries to create a connection to a merger node, 
the other tasks won't be blocked, so we can get most of the merged statuses.
   If there are `shuffleSendFinalizeRPCThreads` tasks trying to create a 
connection to a merger node, the corresponding stages will be blocked for 
`SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-08-24 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r954467609


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -2259,37 +2259,51 @@ private[spark] class DAGScheduler(
 }
 
 override def onShuffleMergeFailure(e: Throwable): Unit = {
+  if (e.isInstanceOf[IOException]) {
+logInfo(s"Failed to connect external shuffle service " 
+
+  s"${shuffleServiceLoc.hostPort}")
+
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+  }
 }
   })
 }
   }
 }, 0, TimeUnit.SECONDS)
   } else {
-stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
-  case (shuffleServiceLoc, index) =>
-// Sends async request to shuffle service to finalize shuffle 
merge on that host
-// TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
-// TODO: during shuffleMergeFinalizeWaitSec
-shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-  new MergeFinalizerListener {
-override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
-  assert(shuffleId == statuses.shuffleId)
-  eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
-convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
-  results(index).set(true)
-}
+shuffleMergeFinalizeScheduler.schedule(new Runnable {

Review Comment:
   Hi, @mridulm I don't think we should cancel all the `scheduledFutures` if 
there is a `TimeoutException`.  Merger ESS need this finalize RPC to finish its 
merge process. So  we should try our best to send all the RPCs even after 
`shuffleMergeResultsTimeoutSec`. Am I right ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

2022-08-24 Thread GitBox


wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r953503909


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -2259,37 +2266,58 @@ private[spark] class DAGScheduler(
 }
 
 override def onShuffleMergeFailure(e: Throwable): Unit = {
+  if (e.isInstanceOf[IOException]) {
+logInfo(s"Failed to connect external shuffle service 
on " +
+  s"${shuffleServiceLoc.host} and add it to blacklist")
+
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+finalizeBlackNodes.put(shuffleServiceLoc.host, 
shuffleServiceLoc.host)
+  }
 }
   })
+
+  case _ =>
 }
   }
 }, 0, TimeUnit.SECONDS)
   } else {
-stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
-  case (shuffleServiceLoc, index) =>
-// Sends async request to shuffle service to finalize shuffle 
merge on that host
-// TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
-// TODO: during shuffleMergeFinalizeWaitSec
-shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-  new MergeFinalizerListener {
-override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
-  assert(shuffleId == statuses.shuffleId)
-  eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
-convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
-  results(index).set(true)
-}
+shuffleMergeFinalizeScheduler.schedule(new Runnable {
+  override def run(): Unit = {
+stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
+  case (shuffleServiceLoc, index)
+if finalizeBlackNodes.getIfPresent(shuffleServiceLoc.host) == 
null =>
+// Sends async request to shuffle service to finalize shuffle 
merge on that host
+// TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage 
is cancelled
+// TODO: during shuffleMergeFinalizeWaitSec
+shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+  shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+  new MergeFinalizerListener {
+override def onShuffleMergeSuccess(statuses: 
MergeStatuses): Unit = {
+  assert(shuffleId == statuses.shuffleId)
+  eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
+convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
+  results(index).set(true)
+}
 
-override def onShuffleMergeFailure(e: Throwable): Unit = {
-  logWarning(s"Exception encountered when trying to finalize 
shuffle " +
-s"merge on ${shuffleServiceLoc.host} for shuffle 
$shuffleId", e)
-  // Do not fail the future as this would cause dag scheduler 
to prematurely
-  // give up on waiting for merge results from the remaining 
shuffle services
-  // if one fails
-  results(index).set(false)
-}
-  })
-}
+override def onShuffleMergeFailure(e: Throwable): Unit = {
+  logWarning(s"Exception encountered when trying to 
finalize shuffle " +
+s"merge on ${shuffleServiceLoc.host} for shuffle 
$shuffleId", e)
+  // Do not fail the future as this would cause dag 
scheduler to prematurely
+  // give up on waiting for merge results from the 
remaining shuffle services
+  // if one fails
+  if (e.isInstanceOf[IOException]) {
+logInfo(s"Failed to connect external shuffle service 
on " +
+  s"${shuffleServiceLoc.host} and add it to blacklist")
+
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+finalizeBlackNodes.put(shuffleServiceLoc.host, 
shuffleServiceLoc.host)
+  }
+  results(index).set(false)
+}
+  })
+
+  case (_, index) => results(index).set(true)
+}
+  }
+}, 0, TimeUnit.SECONDS)

Review Comment:
   If `shuffleMergeFinalizeScheduler` has just 1 thread, `finalizeShuffleMerge` 
method and send RPCs to merger locations will run one by one. so we won't get 
any merged results, and stages will