mridulm commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r958010820
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -282,13 +285,19 @@ private[spark] class DAGScheduler(
None
}
- // Use multi-threaded scheduled executor. The merge finalization task could
take some time,
- // depending on the time to establish connections to mergers, and the time
to get MergeStatuses
- // from all the mergers.
+ // Use multi-threaded scheduled executor. The merge finalization task (per
stage) takes up to
+ // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
private val shuffleMergeFinalizeScheduler =
ThreadUtils.newDaemonThreadPoolScheduledExecutor("shuffle-merge-finalizer",
shuffleMergeFinalizeNumThreads)
+ // Send finalize RPC tasks to merger ESS, one thread per RPC and will be
cancelled after
+ // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. Please close the opened files
in the merger ESS
+ // if finalize RPC is not received due to network issues.
+ private val shuffleSendFinalizeRPCExecutor: ExecutorService =
+ ThreadUtils.newDaemonFixedThreadPool(
+ shuffleSendFinalizeRPCThreads, "send-shuffle-merge-finalize-rpc")
Review Comment:
Fix indentation
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2242,60 +2251,57 @@ private[spark] class DAGScheduler(
val numMergers = stage.shuffleDep.getMergerLocs.length
val results = (0 until numMergers).map(_ =>
SettableFuture.create[Boolean]())
externalShuffleClient.foreach { shuffleClient =>
- if (!registerMergeResults) {
- results.foreach(_.set(true))
- // Finalize in separate thread as shuffle merge is a no-op in this case
- shuffleMergeFinalizeScheduler.schedule(new Runnable {
- override def run(): Unit = {
- stage.shuffleDep.getMergerLocs.foreach {
- case shuffleServiceLoc =>
- // Sends async request to shuffle service to finalize shuffle
merge on that host.
- // Since merge statuses will not be registered in this case,
- // we pass a no-op listener.
- shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
- shuffleServiceLoc.port, shuffleId, shuffleMergeId,
- new MergeFinalizerListener {
- override def onShuffleMergeSuccess(statuses:
MergeStatuses): Unit = {
- }
+ val scheduledFutures = stage.shuffleDep.getMergerLocs.zipWithIndex.map {
+ case (shuffleServiceLoc, index) =>
+ // Sends async request to shuffle service to finalize shuffle merge
on that host
+ // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is
cancelled
+ // TODO: during shuffleMergeFinalizeWaitSec
+ shuffleSendFinalizeRPCExecutor.submit(new Runnable() {
+ override def run(): Unit = {
+ shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+ shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+ new MergeFinalizerListener {
+ override def onShuffleMergeSuccess(statuses: MergeStatuses):
Unit = {
+ assert(shuffleId == statuses.shuffleId)
+ eventProcessLoop.post(RegisterMergeStatuses(stage,
MergeStatus.
+ convertMergeStatusesToMergeStatusArr(statuses,
shuffleServiceLoc)))
+ results(index).set(true)
+ }
Review Comment:
`onShuffleMergeSuccess` and `onShuffleMergeFailure` are no-op's when
`registerMergeResults == false`
I would suggest to keep the code pretty much as-is, except with the
introduction of calling `shuffleClient.finalizeShuffleMerge` in
`shuffleSendFinalizeRPCExecutor`
Something like:
```
val scheduledFutures = {
if (!registerMergeResults) {
results.foreach(_.set(true))
stage.shuffleDep.getMergerLocs.map {
case shuffleServiceLoc =>
shuffleSendFinalizeRPCExecutor.submit(new Runnable() {
override def run(): Unit = {
// earlier code inside case shuffleServiceLoc =>
...
}
});
}
} else {
stage.shuffleDep.getMergerLocs.zipWithIndex.map {
case (shuffleServiceLoc, index) =>
shuffleSendFinalizeRPCExecutor.submit(new Runnable() {
override def run(): Unit = {
// earlier code inside case (shuffleServiceLoc, index) =>
...
}
});
}
}
var timedOut = false
try {
Futures.allAsList(results: _*).get(shuffleMergeResultsTimeoutSec,
TimeUnit.SECONDS)
} catch {
timedOut = true
// earlier code ...
} finally {
if (timedOut || !registerMergeResults) {
cancelFinalizeShuffleMergeFutures(scheduledFutures,
if (timedOut) 0 else shuffleMergeResultsTimeoutSec)
}
eventProcessLoop.post(ShuffleMergeFinalized(stage))
}
private def cancelFinalizeShuffleMergeFutures(futures: Seq[Future[_]],
delayInSecs: Int): Unit = {
def cancelFutures(): Unit = futures.foreach (_..cancel(true))
if (delayInSecs > 0) {
shuffleMergeFinalizeScheduler.schedule(new Runnable {
override def run(): Unit = {
cancelFutures()
}
}, delayInSecs, TimeUnit.SECONDS));
} else {
cancelFutures()
}
}
```
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2311,6 +2311,15 @@ package object config {
.intConf
.createWithDefault(3)
Review Comment:
Perhaps bump this up from 3 as well to 6 or 8 (since now future cancel
happens in that threadpool - assuming my proposed change below is fine) - with
`PUSH_BASED_SHUFFLE_SEND_FINALIZE_RPC_THREADS` correspondingly increased.
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -282,13 +285,19 @@ private[spark] class DAGScheduler(
None
}
- // Use multi-threaded scheduled executor. The merge finalization task could
take some time,
- // depending on the time to establish connections to mergers, and the time
to get MergeStatuses
- // from all the mergers.
+ // Use multi-threaded scheduled executor. The merge finalization task (per
stage) takes up to
+ // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
private val shuffleMergeFinalizeScheduler =
ThreadUtils.newDaemonThreadPoolScheduledExecutor("shuffle-merge-finalizer",
shuffleMergeFinalizeNumThreads)
+ // Send finalize RPC tasks to merger ESS, one thread per RPC and will be
cancelled after
+ // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. Please close the opened files
in the merger ESS
+ // if finalize RPC is not received due to network issues.
+ private val shuffleSendFinalizeRPCExecutor: ExecutorService =
Review Comment:
`RPC` -> `Rpc`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]