otterc commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r961896092
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2309,7 +2309,16 @@ package object config {
" shuffle is enabled.")
.version("3.3.0")
.intConf
- .createWithDefault(3)
+ .createWithDefault(8)
+
+ private[spark] val PUSH_BASED_SHUFFLE_SEND_FINALIZE_RPC_THREADS =
+ ConfigBuilder("spark.shuffle.push.sendFinalizeRPCThreads")
+ .doc("Number of threads used by driver to send finalize shuffle RPC to
the merger" +
Review Comment:
Nit-> Number of threads used by the driver to send finalize shuffle RPC to
mergers.
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2309,7 +2309,16 @@ package object config {
" shuffle is enabled.")
.version("3.3.0")
.intConf
- .createWithDefault(3)
+ .createWithDefault(8)
+
+ private[spark] val PUSH_BASED_SHUFFLE_SEND_FINALIZE_RPC_THREADS =
Review Comment:
Nit-> can this be renamed to `PUSH_SHUFFLE_FINALIZE_RPC_THREADS`
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -282,13 +286,19 @@ private[spark] class DAGScheduler(
None
}
- // Use multi-threaded scheduled executor. The merge finalization task could
take some time,
- // depending on the time to establish connections to mergers, and the time
to get MergeStatuses
- // from all the mergers.
+ // Use multi-threaded scheduled executor. The merge finalization task (per
stage) takes up to
+ // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
Review Comment:
Please change the comment to explain what this scheduler is used for.
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2242,70 +2252,110 @@ private[spark] class DAGScheduler(
val numMergers = stage.shuffleDep.getMergerLocs.length
val results = (0 until numMergers).map(_ =>
SettableFuture.create[Boolean]())
externalShuffleClient.foreach { shuffleClient =>
- if (!registerMergeResults) {
- results.foreach(_.set(true))
- // Finalize in separate thread as shuffle merge is a no-op in this case
- shuffleMergeFinalizeScheduler.schedule(new Runnable {
- override def run(): Unit = {
- stage.shuffleDep.getMergerLocs.foreach {
- case shuffleServiceLoc =>
- // Sends async request to shuffle service to finalize shuffle
merge on that host.
- // Since merge statuses will not be registered in this case,
- // we pass a no-op listener.
- shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
- shuffleServiceLoc.port, shuffleId, shuffleMergeId,
- new MergeFinalizerListener {
- override def onShuffleMergeSuccess(statuses:
MergeStatuses): Unit = {
- }
+ val scheduledFutures =
+ if (!registerMergeResults) {
+ results.foreach(_.set(true))
+ // Finalize in separate thread as shuffle merge is a no-op in this
case
+ stage.shuffleDep.getMergerLocs.map {
+ case shuffleServiceLoc =>
+ // Sends async request to shuffle service to finalize shuffle
merge on that host.
+ // Since merge statuses will not be registered in this case,
+ // we pass a no-op listener.
+ shuffleSendFinalizeRpcExecutor.submit(new Runnable() {
+ override def run(): Unit = {
+ shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+ shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+ new MergeFinalizerListener {
+ override def onShuffleMergeSuccess(statuses:
MergeStatuses): Unit = {
+ }
- override def onShuffleMergeFailure(e: Throwable): Unit = {
- }
- })
- }
- }
- }, 0, TimeUnit.SECONDS)
- } else {
- stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
- case (shuffleServiceLoc, index) =>
- // Sends async request to shuffle service to finalize shuffle
merge on that host
- // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is
cancelled
- // TODO: during shuffleMergeFinalizeWaitSec
- shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
- shuffleServiceLoc.port, shuffleId, shuffleMergeId,
- new MergeFinalizerListener {
- override def onShuffleMergeSuccess(statuses: MergeStatuses):
Unit = {
- assert(shuffleId == statuses.shuffleId)
- eventProcessLoop.post(RegisterMergeStatuses(stage,
MergeStatus.
- convertMergeStatusesToMergeStatusArr(statuses,
shuffleServiceLoc)))
- results(index).set(true)
+ override def onShuffleMergeFailure(e: Throwable): Unit =
{
+ if (e.isInstanceOf[IOException]) {
+ logInfo(s"Failed to connect external shuffle service
" +
+ s"${shuffleServiceLoc.hostPort}")
+
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+ }
+ }
+ })
}
+ })
+ }
+ } else {
+ stage.shuffleDep.getMergerLocs.zipWithIndex.map {
+ case (shuffleServiceLoc, index) =>
+ // Sends async request to shuffle service to finalize shuffle
merge on that host
+ // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage
is cancelled
+ // TODO: during shuffleMergeFinalizeWaitSec
+ shuffleSendFinalizeRpcExecutor.submit(new Runnable() {
+ override def run(): Unit = {
+ shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+ shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+ new MergeFinalizerListener {
+ override def onShuffleMergeSuccess(statuses:
MergeStatuses): Unit = {
+ assert(shuffleId == statuses.shuffleId)
+ eventProcessLoop.post(RegisterMergeStatuses(stage,
MergeStatus.
+ convertMergeStatusesToMergeStatusArr(statuses,
shuffleServiceLoc)))
+ results(index).set(true)
+ }
- override def onShuffleMergeFailure(e: Throwable): Unit = {
- logWarning(s"Exception encountered when trying to finalize
shuffle " +
- s"merge on ${shuffleServiceLoc.host} for shuffle
$shuffleId", e)
- // Do not fail the future as this would cause dag scheduler
to prematurely
- // give up on waiting for merge results from the remaining
shuffle services
- // if one fails
- results(index).set(false)
+ override def onShuffleMergeFailure(e: Throwable): Unit =
{
+ logWarning(s"Exception encountered when trying to
finalize shuffle " +
+ s"merge on ${shuffleServiceLoc.host} for shuffle
$shuffleId", e)
+ // Do not fail the future as this would cause dag
scheduler to prematurely
+ // give up on waiting for merge results from the
remaining shuffle services
+ // if one fails
+ if (e.isInstanceOf[IOException]) {
+ logInfo(s"Failed to connect external shuffle service
" +
+ s"${shuffleServiceLoc.hostPort}")
+
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+ results(index).set(false)
+ }
+ }
+ })
}
})
+ }
}
- }
- // DAGScheduler only waits for a limited amount of time for the merge
results.
- // It will attempt to submit the next stage(s) irrespective of whether
merge results
- // from all shuffle services are received or not.
+ var timedOut = false
try {
Futures.allAsList(results: _*).get(shuffleMergeResultsTimeoutSec,
TimeUnit.SECONDS)
} catch {
case _: TimeoutException =>
+ timedOut = true
logInfo(s"Timed out on waiting for merge results from all " +
s"$numMergers mergers for shuffle $shuffleId")
} finally {
+ if (timedOut || !registerMergeResults) {
+ cancelFinalizeShuffleMergeFutures(scheduledFutures,
Review Comment:
I don't think we should do this. Do we get any benefits from cancelling any
pending send RPC tasks? I understand that when the timeout is elapsed, then the
driver will not use the results from the shuffle service for that shuffle but
at least the shuffle service can process the finalize request and close all the
files for that shuffle.
At the least, I think we shouldn't be clubbing this behavior change with the
bug fix.
I know there was a discussion about this and sorry for commenting late on
this.
cc @mridulm
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2242,70 +2252,110 @@ private[spark] class DAGScheduler(
val numMergers = stage.shuffleDep.getMergerLocs.length
val results = (0 until numMergers).map(_ =>
SettableFuture.create[Boolean]())
externalShuffleClient.foreach { shuffleClient =>
- if (!registerMergeResults) {
- results.foreach(_.set(true))
- // Finalize in separate thread as shuffle merge is a no-op in this case
- shuffleMergeFinalizeScheduler.schedule(new Runnable {
- override def run(): Unit = {
- stage.shuffleDep.getMergerLocs.foreach {
- case shuffleServiceLoc =>
- // Sends async request to shuffle service to finalize shuffle
merge on that host.
- // Since merge statuses will not be registered in this case,
- // we pass a no-op listener.
- shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
- shuffleServiceLoc.port, shuffleId, shuffleMergeId,
- new MergeFinalizerListener {
- override def onShuffleMergeSuccess(statuses:
MergeStatuses): Unit = {
- }
+ val scheduledFutures =
+ if (!registerMergeResults) {
+ results.foreach(_.set(true))
+ // Finalize in separate thread as shuffle merge is a no-op in this
case
+ stage.shuffleDep.getMergerLocs.map {
+ case shuffleServiceLoc =>
+ // Sends async request to shuffle service to finalize shuffle
merge on that host.
+ // Since merge statuses will not be registered in this case,
+ // we pass a no-op listener.
+ shuffleSendFinalizeRpcExecutor.submit(new Runnable() {
+ override def run(): Unit = {
+ shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+ shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+ new MergeFinalizerListener {
+ override def onShuffleMergeSuccess(statuses:
MergeStatuses): Unit = {
+ }
- override def onShuffleMergeFailure(e: Throwable): Unit = {
- }
- })
- }
- }
- }, 0, TimeUnit.SECONDS)
- } else {
- stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
- case (shuffleServiceLoc, index) =>
- // Sends async request to shuffle service to finalize shuffle
merge on that host
- // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is
cancelled
- // TODO: during shuffleMergeFinalizeWaitSec
- shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
- shuffleServiceLoc.port, shuffleId, shuffleMergeId,
- new MergeFinalizerListener {
- override def onShuffleMergeSuccess(statuses: MergeStatuses):
Unit = {
- assert(shuffleId == statuses.shuffleId)
- eventProcessLoop.post(RegisterMergeStatuses(stage,
MergeStatus.
- convertMergeStatusesToMergeStatusArr(statuses,
shuffleServiceLoc)))
- results(index).set(true)
+ override def onShuffleMergeFailure(e: Throwable): Unit =
{
+ if (e.isInstanceOf[IOException]) {
+ logInfo(s"Failed to connect external shuffle service
" +
+ s"${shuffleServiceLoc.hostPort}")
+
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
Review Comment:
Why are we doing this? Network issues can be transient. We don't have
retries for sending finalize RPC which is fine because push/merge operation is
best effort. However, why do we decide remove the host from shuffle push merger
location. I don't recommend clubbing these changes in the bug fix.
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -282,13 +286,19 @@ private[spark] class DAGScheduler(
None
}
- // Use multi-threaded scheduled executor. The merge finalization task could
take some time,
- // depending on the time to establish connections to mergers, and the time
to get MergeStatuses
- // from all the mergers.
+ // Use multi-threaded scheduled executor. The merge finalization task (per
stage) takes up to
+ // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
private val shuffleMergeFinalizeScheduler =
ThreadUtils.newDaemonThreadPoolScheduledExecutor("shuffle-merge-finalizer",
shuffleMergeFinalizeNumThreads)
+ // Send finalize RPC tasks to merger ESS, one thread per RPC and will be
cancelled after
+ // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. Please close the opened files
in the merger ESS
Review Comment:
`Please close the opened files in the merger ESS...` why is that added as a
comment here?
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -273,6 +274,9 @@ private[spark] class DAGScheduler(
private val shuffleMergeFinalizeNumThreads =
sc.getConf.get(config.PUSH_BASED_SHUFFLE_MERGE_FINALIZE_THREADS)
+ private val shuffleSendFinalizeRpcThreads =
Review Comment:
Nit-> rename to `shuffleFinalizeRpcThreads`
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2242,70 +2252,110 @@ private[spark] class DAGScheduler(
val numMergers = stage.shuffleDep.getMergerLocs.length
val results = (0 until numMergers).map(_ =>
SettableFuture.create[Boolean]())
externalShuffleClient.foreach { shuffleClient =>
- if (!registerMergeResults) {
- results.foreach(_.set(true))
- // Finalize in separate thread as shuffle merge is a no-op in this case
- shuffleMergeFinalizeScheduler.schedule(new Runnable {
- override def run(): Unit = {
- stage.shuffleDep.getMergerLocs.foreach {
- case shuffleServiceLoc =>
- // Sends async request to shuffle service to finalize shuffle
merge on that host.
- // Since merge statuses will not be registered in this case,
- // we pass a no-op listener.
- shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
- shuffleServiceLoc.port, shuffleId, shuffleMergeId,
- new MergeFinalizerListener {
- override def onShuffleMergeSuccess(statuses:
MergeStatuses): Unit = {
- }
+ val scheduledFutures =
+ if (!registerMergeResults) {
+ results.foreach(_.set(true))
+ // Finalize in separate thread as shuffle merge is a no-op in this
case
+ stage.shuffleDep.getMergerLocs.map {
+ case shuffleServiceLoc =>
+ // Sends async request to shuffle service to finalize shuffle
merge on that host.
+ // Since merge statuses will not be registered in this case,
+ // we pass a no-op listener.
+ shuffleSendFinalizeRpcExecutor.submit(new Runnable() {
+ override def run(): Unit = {
+ shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+ shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+ new MergeFinalizerListener {
+ override def onShuffleMergeSuccess(statuses:
MergeStatuses): Unit = {
+ }
- override def onShuffleMergeFailure(e: Throwable): Unit = {
- }
- })
- }
- }
- }, 0, TimeUnit.SECONDS)
- } else {
- stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
- case (shuffleServiceLoc, index) =>
- // Sends async request to shuffle service to finalize shuffle
merge on that host
- // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is
cancelled
- // TODO: during shuffleMergeFinalizeWaitSec
- shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
- shuffleServiceLoc.port, shuffleId, shuffleMergeId,
- new MergeFinalizerListener {
- override def onShuffleMergeSuccess(statuses: MergeStatuses):
Unit = {
- assert(shuffleId == statuses.shuffleId)
- eventProcessLoop.post(RegisterMergeStatuses(stage,
MergeStatus.
- convertMergeStatusesToMergeStatusArr(statuses,
shuffleServiceLoc)))
- results(index).set(true)
+ override def onShuffleMergeFailure(e: Throwable): Unit =
{
+ if (e.isInstanceOf[IOException]) {
+ logInfo(s"Failed to connect external shuffle service
" +
+ s"${shuffleServiceLoc.hostPort}")
+
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+ }
+ }
+ })
}
+ })
+ }
+ } else {
+ stage.shuffleDep.getMergerLocs.zipWithIndex.map {
+ case (shuffleServiceLoc, index) =>
+ // Sends async request to shuffle service to finalize shuffle
merge on that host
+ // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage
is cancelled
+ // TODO: during shuffleMergeFinalizeWaitSec
+ shuffleSendFinalizeRpcExecutor.submit(new Runnable() {
+ override def run(): Unit = {
+ shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+ shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+ new MergeFinalizerListener {
+ override def onShuffleMergeSuccess(statuses:
MergeStatuses): Unit = {
+ assert(shuffleId == statuses.shuffleId)
+ eventProcessLoop.post(RegisterMergeStatuses(stage,
MergeStatus.
+ convertMergeStatusesToMergeStatusArr(statuses,
shuffleServiceLoc)))
+ results(index).set(true)
+ }
- override def onShuffleMergeFailure(e: Throwable): Unit = {
- logWarning(s"Exception encountered when trying to finalize
shuffle " +
- s"merge on ${shuffleServiceLoc.host} for shuffle
$shuffleId", e)
- // Do not fail the future as this would cause dag scheduler
to prematurely
- // give up on waiting for merge results from the remaining
shuffle services
- // if one fails
- results(index).set(false)
+ override def onShuffleMergeFailure(e: Throwable): Unit =
{
+ logWarning(s"Exception encountered when trying to
finalize shuffle " +
+ s"merge on ${shuffleServiceLoc.host} for shuffle
$shuffleId", e)
+ // Do not fail the future as this would cause dag
scheduler to prematurely
+ // give up on waiting for merge results from the
remaining shuffle services
+ // if one fails
+ if (e.isInstanceOf[IOException]) {
+ logInfo(s"Failed to connect external shuffle service
" +
+ s"${shuffleServiceLoc.hostPort}")
+
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
Review Comment:
Same here. PTAL at the above comment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]