Ngone51 commented on a change in pull request #33896:
URL: https://github.com/apache/spark/pull/33896#discussion_r768379127
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2068,71 +2129,149 @@ private[spark] class DAGScheduler(
}
/**
- * Schedules shuffle merge finalize.
+ *
+ * Schedules shuffle merge finalization.
+ *
+ * @param stage the stage to finalize shuffle merge
+ * @param delay how long to wait before finalizing shuffle merge
+ * @param registerMergeResults indicate whether DAGScheduler would register
the received
+ * MergeStatus with MapOutputTracker and wait to
schedule the reduce
+ * stage until MergeStatus have been received
from all mergers or
+ * reaches timeout. For very small shuffle, this
could be set to
+ * false to avoid impact to job runtime.
+ * @return whether the caller is able to schedule a finalize task
*/
- private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage):
Unit = {
- // TODO: SPARK-33701: Instead of waiting for a constant amount of time for
finalization
- // TODO: for all the stages, adaptively tune timeout for merge finalization
- logInfo(("%s (%s) scheduled for finalizing" +
- " shuffle merge in %s s").format(stage, stage.name,
shuffleMergeFinalizeWaitSec))
- shuffleMergeFinalizeScheduler.schedule(
- new Runnable {
- override def run(): Unit = finalizeShuffleMerge(stage)
- },
- shuffleMergeFinalizeWaitSec,
- TimeUnit.SECONDS
- )
+ private[scheduler] def scheduleShuffleMergeFinalize(
+ stage: ShuffleMapStage,
+ delay: Long,
+ registerMergeResults: Boolean = true): Boolean = {
+ val shuffleDep = stage.shuffleDep
+ val scheduledTask: Option[ScheduledFuture[_]] = shuffleDep.getFinalizeTask
+ scheduledTask match {
+ case Some(task) =>
+ // If we find an already scheduled task, check if the task has been
triggered yet.
+ // If it's already triggered, do nothing. Otherwise, cancel it and
schedule a new
+ // one for immediate execution. Note that we should get here only when
+ // handleShufflePushCompleted schedules a finalize task after the
shuffle map stage
+ // completed earlier and scheduled a task with default delay.
+ // The current task should be coming from handleShufflePushCompleted,
thus the
+ // delay should be 0 and registerMergeResults should be true.
+ assert(delay == 0 && registerMergeResults)
+ if (task.getDelay(TimeUnit.NANOSECONDS) > 0 && task.cancel(false)) {
+ logInfo(s"$stage (${stage.name}) scheduled for finalizing shuffle
merge immediately " +
+ s"after cancelling previously scheduled task.")
+ shuffleDep.setFinalizeTask(
+ shuffleMergeFinalizeScheduler.schedule(
+ new Runnable {
+ override def run(): Unit = finalizeShuffleMerge(stage,
registerMergeResults)
+ },
+ 0,
+ TimeUnit.SECONDS
+ )
+ )
+ true
+ } else {
+ logInfo(s"$stage (${stage.name}) existing scheduled task for
finalizing shuffle merge" +
+ s"would either be in-progress or finished. No need to schedule
shuffle merge" +
+ s" finalization again.")
+ false
+ }
+ case None =>
+ // If no previous finalization task is scheduled, schedule the
finalization task.
+ logInfo(s"$stage (${stage.name}) scheduled for finalizing shuffle
merge in $delay s")
+ shuffleDep.setFinalizeTask(
+ shuffleMergeFinalizeScheduler.schedule(
+ new Runnable {
+ override def run(): Unit = finalizeShuffleMerge(stage,
registerMergeResults)
+ },
+ delay,
+ TimeUnit.SECONDS
+ )
+ )
+ true
+ }
}
/**
* DAGScheduler notifies all the remote shuffle services chosen to serve
shuffle merge request for
* the given shuffle map stage to finalize the shuffle merge process for
this shuffle. This is
* invoked in a separate thread to reduce the impact on the DAGScheduler
main thread, as the
* scheduler might need to talk to 1000s of shuffle services to finalize
shuffle merge.
+ *
+ * @param stage ShuffleMapStage to finalize shuffle merge for
+ * @param registerMergeResults indicate whether DAGScheduler would register
the received
+ * MergeStatus with MapOutputTracker and wait to
schedule the reduce
+ * stage until MergeStatus have been received
from all mergers or
+ * reaches timeout. For very small shuffle, this
could be set to
+ * false to avoid impact to job runtime.
*/
- private[scheduler] def finalizeShuffleMerge(stage: ShuffleMapStage): Unit = {
- logInfo("%s (%s) finalizing the shuffle merge".format(stage, stage.name))
+ private[scheduler] def finalizeShuffleMerge(
+ stage: ShuffleMapStage,
+ registerMergeResults: Boolean = true): Unit = {
+ logInfo(s"$stage (${stage.name}) finalizing the shuffle merge with
registering merge " +
+ s"results set to $registerMergeResults")
+ val shuffleId = stage.shuffleDep.shuffleId
+ val shuffleMergeId = stage.shuffleDep.shuffleMergeId
+ val numMergers = stage.shuffleDep.getMergerLocs.length
+ val results = (0 until numMergers).map(_ =>
SettableFuture.create[Boolean]())
externalShuffleClient.foreach { shuffleClient =>
- val shuffleId = stage.shuffleDep.shuffleId
- val numMergers = stage.shuffleDep.getMergerLocs.length
- val results = (0 until numMergers).map(_ =>
SettableFuture.create[Boolean]())
-
- stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
- case (shuffleServiceLoc, index) =>
- // Sends async request to shuffle service to finalize shuffle merge
on that host
- // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is
cancelled
- // TODO: during shuffleMergeFinalizeWaitSec
- shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
- shuffleServiceLoc.port, shuffleId, stage.shuffleDep.shuffleMergeId,
- new MergeFinalizerListener {
- override def onShuffleMergeSuccess(statuses: MergeStatuses):
Unit = {
- assert(shuffleId == statuses.shuffleId)
- eventProcessLoop.post(RegisterMergeStatuses(stage, MergeStatus.
- convertMergeStatusesToMergeStatusArr(statuses,
shuffleServiceLoc)))
- results(index).set(true)
- }
+ if (!registerMergeResults) {
+ results.foreach(_.set(true))
+ // Finalize in separate thread as shuffle merge is a no-op in this case
+ ThreadUtils.runInNewThread("no-op-finalize-shuffle-merge", false) {
Review comment:
Looks like `runInNewThread` is a blocking call indeed.
BTW, I don't see someone here who could revoke the thread in the error case
if we configure it as a non-daemon thread. So I think a daemon thread may be
better.
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1761,7 +1822,7 @@ private[spark] class DAGScheduler(
if (runningStages.contains(shuffleStage) &&
shuffleStage.pendingPartitions.isEmpty) {
if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
Review comment:
Shall we add the condition `getMergerLocs.nonEmpty` into
`shuffleDep.shuffleMergeFinalized`? e.g.,
```scala
def shuffleMergeFinalized: Boolean = {
// Empty RDD won't be computed therefore shuffle merge finalized should
be true by default.
if (shuffleMergeEnabled && numPartitions > 0 && mergerLocs.nonEmpty) {
_shuffleMergedFinalized
} else {
true
}
}
```
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2068,71 +2129,149 @@ private[spark] class DAGScheduler(
}
/**
- * Schedules shuffle merge finalize.
+ *
+ * Schedules shuffle merge finalization.
+ *
+ * @param stage the stage to finalize shuffle merge
+ * @param delay how long to wait before finalizing shuffle merge
+ * @param registerMergeResults indicate whether DAGScheduler would register
the received
+ * MergeStatus with MapOutputTracker and wait to
schedule the reduce
+ * stage until MergeStatus have been received
from all mergers or
+ * reaches timeout. For very small shuffle, this
could be set to
+ * false to avoid impact to job runtime.
+ * @return whether the caller is able to schedule a finalize task
*/
- private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage):
Unit = {
- // TODO: SPARK-33701: Instead of waiting for a constant amount of time for
finalization
- // TODO: for all the stages, adaptively tune timeout for merge finalization
- logInfo(("%s (%s) scheduled for finalizing" +
- " shuffle merge in %s s").format(stage, stage.name,
shuffleMergeFinalizeWaitSec))
- shuffleMergeFinalizeScheduler.schedule(
- new Runnable {
- override def run(): Unit = finalizeShuffleMerge(stage)
- },
- shuffleMergeFinalizeWaitSec,
- TimeUnit.SECONDS
- )
+ private[scheduler] def scheduleShuffleMergeFinalize(
+ stage: ShuffleMapStage,
+ delay: Long,
+ registerMergeResults: Boolean = true): Boolean = {
+ val shuffleDep = stage.shuffleDep
+ val scheduledTask: Option[ScheduledFuture[_]] = shuffleDep.getFinalizeTask
+ scheduledTask match {
+ case Some(task) =>
+ // If we find an already scheduled task, check if the task has been
triggered yet.
+ // If it's already triggered, do nothing. Otherwise, cancel it and
schedule a new
+ // one for immediate execution. Note that we should get here only when
+ // handleShufflePushCompleted schedules a finalize task after the
shuffle map stage
+ // completed earlier and scheduled a task with default delay.
+ // The current task should be coming from handleShufflePushCompleted,
thus the
+ // delay should be 0 and registerMergeResults should be true.
+ assert(delay == 0 && registerMergeResults)
+ if (task.getDelay(TimeUnit.NANOSECONDS) > 0 && task.cancel(false)) {
+ logInfo(s"$stage (${stage.name}) scheduled for finalizing shuffle
merge immediately " +
+ s"after cancelling previously scheduled task.")
+ shuffleDep.setFinalizeTask(
+ shuffleMergeFinalizeScheduler.schedule(
+ new Runnable {
+ override def run(): Unit = finalizeShuffleMerge(stage,
registerMergeResults)
+ },
+ 0,
+ TimeUnit.SECONDS
+ )
+ )
+ true
+ } else {
+ logInfo(s"$stage (${stage.name}) existing scheduled task for
finalizing shuffle merge" +
+ s"would either be in-progress or finished. No need to schedule
shuffle merge" +
+ s" finalization again.")
+ false
+ }
+ case None =>
+ // If no previous finalization task is scheduled, schedule the
finalization task.
+ logInfo(s"$stage (${stage.name}) scheduled for finalizing shuffle
merge in $delay s")
+ shuffleDep.setFinalizeTask(
+ shuffleMergeFinalizeScheduler.schedule(
+ new Runnable {
+ override def run(): Unit = finalizeShuffleMerge(stage,
registerMergeResults)
+ },
+ delay,
+ TimeUnit.SECONDS
+ )
+ )
+ true
+ }
}
/**
* DAGScheduler notifies all the remote shuffle services chosen to serve
shuffle merge request for
* the given shuffle map stage to finalize the shuffle merge process for
this shuffle. This is
* invoked in a separate thread to reduce the impact on the DAGScheduler
main thread, as the
* scheduler might need to talk to 1000s of shuffle services to finalize
shuffle merge.
+ *
+ * @param stage ShuffleMapStage to finalize shuffle merge for
+ * @param registerMergeResults indicate whether DAGScheduler would register
the received
+ * MergeStatus with MapOutputTracker and wait to
schedule the reduce
+ * stage until MergeStatus have been received
from all mergers or
+ * reaches timeout. For very small shuffle, this
could be set to
+ * false to avoid impact to job runtime.
*/
- private[scheduler] def finalizeShuffleMerge(stage: ShuffleMapStage): Unit = {
- logInfo("%s (%s) finalizing the shuffle merge".format(stage, stage.name))
+ private[scheduler] def finalizeShuffleMerge(
+ stage: ShuffleMapStage,
+ registerMergeResults: Boolean = true): Unit = {
+ logInfo(s"$stage (${stage.name}) finalizing the shuffle merge with
registering merge " +
+ s"results set to $registerMergeResults")
+ val shuffleId = stage.shuffleDep.shuffleId
+ val shuffleMergeId = stage.shuffleDep.shuffleMergeId
+ val numMergers = stage.shuffleDep.getMergerLocs.length
+ val results = (0 until numMergers).map(_ =>
SettableFuture.create[Boolean]())
externalShuffleClient.foreach { shuffleClient =>
- val shuffleId = stage.shuffleDep.shuffleId
- val numMergers = stage.shuffleDep.getMergerLocs.length
- val results = (0 until numMergers).map(_ =>
SettableFuture.create[Boolean]())
-
- stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
- case (shuffleServiceLoc, index) =>
- // Sends async request to shuffle service to finalize shuffle merge
on that host
- // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is
cancelled
- // TODO: during shuffleMergeFinalizeWaitSec
- shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
- shuffleServiceLoc.port, shuffleId, stage.shuffleDep.shuffleMergeId,
- new MergeFinalizerListener {
- override def onShuffleMergeSuccess(statuses: MergeStatuses):
Unit = {
- assert(shuffleId == statuses.shuffleId)
- eventProcessLoop.post(RegisterMergeStatuses(stage, MergeStatus.
- convertMergeStatusesToMergeStatusArr(statuses,
shuffleServiceLoc)))
- results(index).set(true)
- }
+ if (!registerMergeResults) {
+ results.foreach(_.set(true))
+ // Finalize in separate thread as shuffle merge is a no-op in this case
+ ThreadUtils.runInNewThread("no-op-finalize-shuffle-merge", false) {
+ stage.shuffleDep.getMergerLocs.foreach {
+ case shuffleServiceLoc =>
+ // Sends async request to shuffle service to finalize shuffle
merge on that host.
+ // Since merge statuses will not be registered in this case, we
pass a no-op listener.
+ shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+ shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+ new MergeFinalizerListener {
+ override def onShuffleMergeSuccess(statuses: MergeStatuses):
Unit = {
+ }
- override def onShuffleMergeFailure(e: Throwable): Unit = {
- logWarning(s"Exception encountered when trying to finalize
shuffle " +
- s"merge on ${shuffleServiceLoc.host} for shuffle
$shuffleId", e)
- // Do not fail the future as this would cause dag scheduler to
prematurely
- // give up on waiting for merge results from the remaining
shuffle services
- // if one fails
- results(index).set(false)
- }
- })
- }
- // DAGScheduler only waits for a limited amount of time for the merge
results.
- // It will attempt to submit the next stage(s) irrespective of whether
merge results
- // from all shuffle services are received or not.
- try {
- Futures.allAsList(results: _*).get(shuffleMergeResultsTimeoutSec,
TimeUnit.SECONDS)
- } catch {
- case _: TimeoutException =>
- logInfo(s"Timed out on waiting for merge results from all " +
- s"$numMergers mergers for shuffle $shuffleId")
- } finally {
- eventProcessLoop.post(ShuffleMergeFinalized(stage))
+ override def onShuffleMergeFailure(e: Throwable): Unit = {
+ }
+ })
+ }
+ }
+ } else {
+ stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
+ case (shuffleServiceLoc, index) =>
+ // Sends async request to shuffle service to finalize shuffle
merge on that host
+ // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is
cancelled
+ // TODO: during shuffleMergeFinalizeWaitSec
+ shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+ shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+ new MergeFinalizerListener {
+ override def onShuffleMergeSuccess(statuses: MergeStatuses):
Unit = {
+ assert(shuffleId == statuses.shuffleId)
+ eventProcessLoop.post(RegisterMergeStatuses(stage,
MergeStatus.
+ convertMergeStatusesToMergeStatusArr(statuses,
shuffleServiceLoc)))
+ results(index).set(true)
+ }
+
+ override def onShuffleMergeFailure(e: Throwable): Unit = {
+ logWarning(s"Exception encountered when trying to finalize
shuffle " +
+ s"merge on ${shuffleServiceLoc.host} for shuffle
$shuffleId", e)
+ // Do not fail the future as this would cause dag scheduler
to prematurely
+ // give up on waiting for merge results from the remaining
shuffle services
+ // if one fails
+ results(index).set(false)
+ }
+ })
+ }
+ // DAGScheduler only waits for a limited amount of time for the merge
results.
+ // It will attempt to submit the next stage(s) irrespective of whether
merge results
+ // from all shuffle services are received or not.
+ try {
+ Futures.allAsList(results: _*).get(shuffleMergeResultsTimeoutSec,
TimeUnit.SECONDS)
+ } catch {
+ case _: TimeoutException =>
+ logInfo(s"Timed out on waiting for merge results from all " +
+ s"$numMergers mergers for shuffle $shuffleId")
+ } finally {
+ eventProcessLoop.post(ShuffleMergeFinalized(stage))
+ }
Review comment:
This should be put outside of the else branch?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]