mridulm commented on a change in pull request #33896:
URL: https://github.com/apache/spark/pull/33896#discussion_r739766622
##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -131,7 +135,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C:
ClassTag](
def shuffleMergeId: Int = _shuffleMergeId
def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
- if (mergerLocs != null) {
+ if (mergerLocs != null && mergerLocs.nonEmpty) {
Review comment:
What was the reason for this change ?
As an implementation detail, current code does not make this call with empty
`mergerLocs` - trying to understand why we needed to do this.
##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -143,6 +143,7 @@ class SparkEnv (
object SparkEnv extends Logging {
@volatile private var env: SparkEnv = _
+ private[spark] var driverRpcEndpoint: Option[RpcEndpointRef] = None
Review comment:
Adding this as a field in `SparkEnv` does not look right.
How about:
* Add a `private[spark] var executorBackend:Option[ExecutorBackend]` to
`SparkEnv` which defaults to `None` and is configured in
`CoarseGrainedExecutorBackend.run` and in `SparkContext` (for local) after
executor backend is created.
* Add an explicit method (say `sendPushCompletion`) in
`CoarseGrainedExecutorBackend` to notify driver about push completion.
* In `ShuffleBlockPusher.notifyDriverAboutPushCompletion`, do
```
SparkEnv.get.executorBackend.foreach(_ match {
case cb: CoarseGrainedExecutorBackend => cb.sendPushCompletion()
case _ =>
})
```
Thoughts ?
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1716,7 +1739,31 @@ private[spark] class DAGScheduler(
if (runningStages.contains(shuffleStage) &&
shuffleStage.pendingPartitions.isEmpty) {
if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
- scheduleShuffleMergeFinalize(shuffleStage)
+ // Check if a finalize task has already been scheduled. This
is to prevent the
+ // following scenario: Stage A attempt 0 fails and gets
retried. Stage A attempt 1
+ // succeeded, triggering the scheduling of shuffle merge
finalization. However,
+ // tasks from Stage A attempt 0 might still be running and
sending task completion
+ // events to DAGScheduler. This check prevents multiple
attempts to schedule merge
+ // finalization get triggered due to this.
+ if (shuffleStage.shuffleDep.getFinalizeTask.isEmpty) {
+ // If total shuffle size is smaller than the threshold,
attempt to immediately
+ // schedule shuffle merge finalization and process map stage
completion.
+ val totalSize = Try(mapOutputTracker
+
.getStatistics(shuffleStage.shuffleDep).bytesByPartitionId.sum).getOrElse(0L)
Review comment:
Why/When does it fail ? (Why `Try` ?)
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1716,7 +1739,31 @@ private[spark] class DAGScheduler(
if (runningStages.contains(shuffleStage) &&
shuffleStage.pendingPartitions.isEmpty) {
if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
- scheduleShuffleMergeFinalize(shuffleStage)
+ // Check if a finalize task has already been scheduled. This
is to prevent the
+ // following scenario: Stage A attempt 0 fails and gets
retried. Stage A attempt 1
+ // succeeded, triggering the scheduling of shuffle merge
finalization. However,
+ // tasks from Stage A attempt 0 might still be running and
sending task completion
+ // events to DAGScheduler. This check prevents multiple
attempts to schedule merge
+ // finalization get triggered due to this.
Review comment:
The more I look at this, the finalize task is confusing.
Instead, why not simply check state explicitly and avoid processing if there
have been stage failures/resubmissions/etc ?
Rough sketch:
Pass the stage attempt id to `finalizeShuffleMerge`, etc.
Before finalizing a stage, check if the latest attempt id is same as input
param - and if not, avoid finalization (simply return).
That way, we dont need to keep track of the task or cancel it (the task,
when run, will be a no-op).
Thoughts ?
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2023,71 +2070,140 @@ private[spark] class DAGScheduler(
}
/**
- * Schedules shuffle merge finalize.
+ *
+ * Schedules shuffle merge finalization.
+ *
+ * @param stage the stage to finalize shuffle merge
+ * @param delay how long to wait before finalizing shuffle merge
+ * @param registerMergeResults whether to wait for merge results before
scheduling the next stage
+ * @return whether the caller is able to schedule a finalize task
*/
- private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage):
Unit = {
- // TODO: SPARK-33701: Instead of waiting for a constant amount of time for
finalization
- // TODO: for all the stages, adaptively tune timeout for merge finalization
- logInfo(("%s (%s) scheduled for finalizing" +
- " shuffle merge in %s s").format(stage, stage.name,
shuffleMergeFinalizeWaitSec))
- shuffleMergeFinalizeScheduler.schedule(
- new Runnable {
- override def run(): Unit = finalizeShuffleMerge(stage)
- },
- shuffleMergeFinalizeWaitSec,
- TimeUnit.SECONDS
- )
+ private[scheduler] def scheduleShuffleMergeFinalize(
+ stage: ShuffleMapStage,
+ delay: Long,
+ registerMergeResults: Boolean = true): Boolean = {
+ val shuffleDep = stage.shuffleDep
+ val scheduledTask: Option[ScheduledFuture[_]] = shuffleDep.getFinalizeTask
+ scheduledTask match {
+ case Some(task) =>
+ // If we find an already scheduled task, check if the task has been
triggered yet.
+ // If it's already triggered, do nothing. Otherwise, cancel it and
schedule a new
+ // one for immediate execution. Note that we should get here only when
+ // handleShufflePushCompleted schedules a finalize task after the
shuffle map stage
+ // completed earlier and scheduled a task with default delay.
+ if (task.getDelay(TimeUnit.NANOSECONDS) > 0) {
+ task.cancel(false)
Review comment:
```suggestion
if (task.getDelay(TimeUnit.NANOSECONDS) > 0 && task.cancel(false)) {
```
Proceed only if task.cancel also worked.
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1716,7 +1739,31 @@ private[spark] class DAGScheduler(
if (runningStages.contains(shuffleStage) &&
shuffleStage.pendingPartitions.isEmpty) {
if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
- scheduleShuffleMergeFinalize(shuffleStage)
+ // Check if a finalize task has already been scheduled. This
is to prevent the
+ // following scenario: Stage A attempt 0 fails and gets
retried. Stage A attempt 1
+ // succeeded, triggering the scheduling of shuffle merge
finalization. However,
+ // tasks from Stage A attempt 0 might still be running and
sending task completion
+ // events to DAGScheduler. This check prevents multiple
attempts to schedule merge
+ // finalization get triggered due to this.
+ if (shuffleStage.shuffleDep.getFinalizeTask.isEmpty) {
+ // If total shuffle size is smaller than the threshold,
attempt to immediately
+ // schedule shuffle merge finalization and process map stage
completion.
+ val totalSize = Try(mapOutputTracker
+
.getStatistics(shuffleStage.shuffleDep).bytesByPartitionId.sum).getOrElse(0L)
+ if (totalSize < shuffleMergeWaitMinSizeThreshold) {
+ // Check if we can process map stage completion. If
shuffle merge finalization
+ // is already triggered because push completion ratio was
reached earlier,
+ // we cannot process map stage completion, but have to
wait for the finalization
+ // to finish. This is because it's not straightforward to
interrupt the
+ // finalization task and undo what it might have already
done.
+ if (scheduleShuffleMergeFinalize(shuffleStage, delay = 0,
+ registerMergeResults = false)) {
Review comment:
I am not sure of the semantics of this code path are ideal.
There are two cases:
a) We dont want to wait for executors to finish push, but want to trigger
finalization at ESS immediately - so that any pushed data already available is
finalized.
b) We dont care about pushed data, discard push entirely - move on the next
stage.
We are doing (b) when `totalSize` < `shuffleMergeWaitMinSizeThreshold`.
While waiting for `shuffleMergeResultsTimeoutSec` for (a) might not be
optimal for quick shuffles (since there can be small and large shuffles in same
app), allowing some time for ESS to complete and leveraging whatever subset of
ESS which could respond back to driver in time (and not discarding all the
merge result entirely) can be useful (even if timeout is aggressively small).
Thoughts ?
##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -172,6 +177,29 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C:
ClassTag](
!rdd.isBarrier()
}
+ @transient private[this] val shufflePushCompleted = new RoaringBitmap()
+
+ /**
+ * Mark a given map task as push completed in the tracking bitmap.
+ * Using the bitmap ensures that the same map task launched multiple times
due to
+ * either speculation or stage retry is only counted once.
+ * @param mapIndex Map task index
+ * @return number of map tasks with block push completed
+ */
+ def incPushCompleted(mapIndex: Int): Int = {
+ shufflePushCompleted.add(mapIndex)
+ shufflePushCompleted.getCardinality
+ }
Review comment:
We need a clear as well - for non determinate stages, when we regenerate
the shuffle data, `shufflePushCompleted` has to be reset back to empty in
`newShuffleMergeState`.
Similarly, we need to handle `finalizeTask` as well.
##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -172,6 +177,29 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C:
ClassTag](
!rdd.isBarrier()
}
+ @transient private[this] val shufflePushCompleted = new RoaringBitmap()
+
+ /**
+ * Mark a given map task as push completed in the tracking bitmap.
+ * Using the bitmap ensures that the same map task launched multiple times
due to
+ * either speculation or stage retry is only counted once.
+ * @param mapIndex Map task index
+ * @return number of map tasks with block push completed
+ */
+ def incPushCompleted(mapIndex: Int): Int = {
+ shufflePushCompleted.add(mapIndex)
+ shufflePushCompleted.getCardinality
+ }
+
+ // Only used by DAGScheduler to coordinate shuffle merge finalization
+ @transient private[this] var finalizeTask: Option[ScheduledFuture[_]] = None
+
+ def getFinalizeTask: Option[ScheduledFuture[_]] = finalizeTask
+
+ def setFinalizeTask(task: ScheduledFuture[_]): Unit = {
+ finalizeTask = Some(task)
Review comment:
`finalizeTask = Option(task)`
##########
File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
##########
@@ -328,10 +339,30 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf)
extends Logging {
s"stop.")
return false
} else {
+ if (remainingBlocks.isEmpty && pushRequests.isEmpty &&
deferredPushRequests.isEmpty &&
+ !pushCompletionNotified) {
+ notifyDriverAboutPushCompletion()
+ pushCompletionNotified = true
+ }
remainingBlocks.isEmpty && (pushRequests.nonEmpty ||
deferredPushRequests.nonEmpty)
}
}
+ /**
+ * Notify the driver via the saved RPC endpoint about all the blocks
generated by the current
+ * map task having been pushed. This enables the DAGScheduler to finalize
shuffle merge as soon
+ * as sufficient map tasks have completed push instead of always waiting for
a fixed amount of
+ * time.
+ */
+ private def notifyDriverAboutPushCompletion(): Unit = {
+ assert(shuffleId >= 0 && mapIndex >= 0)
+ val msg = ShufflePushCompletion(shuffleId, mapIndex)
+ SparkEnv.driverRpcEndpoint match {
+ case Some(driverRef) => driverRef.send(msg)
+ case None => logWarning(s"Drop $msg because executor has not yet
connected to driver")
Review comment:
As mentioned above, this gets moved to `CoarseGrainedExecutorBackend`.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2193,6 +2193,29 @@ package object config {
// with small MB sized chunk of data.
.createWithDefaultString("3m")
+ private[spark] val PUSH_BASED_SHUFFLE_MERGE_FINALIZE_THREADS =
+ ConfigBuilder("spark.shuffle.push.merge.finalizeThreads")
+ .doc("Specify the number of threads used by DAGScheduler to finalize
shuffle merge. " +
+ "Since it could potentially take seconds for a large shuffle to
finalize, having " +
+ "multiple threads helps DAGScheduler to handle multiple concurrent
shuffle merge " +
+ "finalize requests when push-based shuffle is enabled.")
+ .intConf
+ .createWithDefault(3)
+
+ private[spark] val PUSH_BASED_SHUFFLE_SIZE_MIN_SHUFFLE_SIZE_TO_WAIT =
+ ConfigBuilder("spark.shuffle.push.minShuffleSizeToWait")
+ .doc("The min size of total shuffle size for DAGScheduler to actually
wait for merge " +
Review comment:
We need to improve this doc - it should help users determine what the
impact of config is and how to better override it/what impact of changing it is.
Something like:
`Driver will wait for merge finalization to complete only if total shuffle
size is more than this threshold. If total shuffle size is less, driver will
immediately finalize the shuffle output` ?
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -716,7 +729,9 @@ private[spark] class DAGScheduler(
// Mark mapStage as available with shuffle outputs only after
shuffle merge is
// finalized with push based shuffle. If not, subsequent
ShuffleMapStage won't
// read from merged output as the MergeStatuses are not
available.
- if (!mapStage.isAvailable ||
!mapStage.shuffleDep.shuffleMergeFinalized) {
+ if (!mapStage.isAvailable ||
+ (mapStage.numPartitions > 0 &&
mapStage.shuffleDep.shuffleMergeEnabled &&
+ !mapStage.shuffleDep.shuffleMergeFinalized)) {
Review comment:
Do we want to move the `numPartitions > 0` check into
`canShuffleMergeBeEnabled` ?
There is an increasing number of places where we are doing the numPartitions
> 0 check.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2193,6 +2193,29 @@ package object config {
// with small MB sized chunk of data.
.createWithDefaultString("3m")
+ private[spark] val PUSH_BASED_SHUFFLE_MERGE_FINALIZE_THREADS =
+ ConfigBuilder("spark.shuffle.push.merge.finalizeThreads")
+ .doc("Specify the number of threads used by DAGScheduler to finalize
shuffle merge. " +
+ "Since it could potentially take seconds for a large shuffle to
finalize, having " +
+ "multiple threads helps DAGScheduler to handle multiple concurrent
shuffle merge " +
+ "finalize requests when push-based shuffle is enabled.")
+ .intConf
+ .createWithDefault(3)
+
+ private[spark] val PUSH_BASED_SHUFFLE_SIZE_MIN_SHUFFLE_SIZE_TO_WAIT =
+ ConfigBuilder("spark.shuffle.push.minShuffleSizeToWait")
+ .doc("The min size of total shuffle size for DAGScheduler to actually
wait for merge " +
+ "finalization when push based shuffle is enabled.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("500m")
+
+ private[spark] val PUSH_BASED_SHUFFLE_MIN_PUSH_RATIO =
+ ConfigBuilder("spark.shuffle.push.minPushRatio")
+ .doc("The min percentage of map tasks that have completed pushing their
shuffle output " +
+ "for DAGScheduler to start merge finalization.")
Review comment:
doc -> `Fraction of map partitions that should complete before driver
starts merge finalization during push based shuffle.`
Also, add reference to other timeouts/configs which intereact/are relevant
here (and also add to those configs as well).
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2193,6 +2193,29 @@ package object config {
// with small MB sized chunk of data.
.createWithDefaultString("3m")
+ private[spark] val PUSH_BASED_SHUFFLE_MERGE_FINALIZE_THREADS =
+ ConfigBuilder("spark.shuffle.push.merge.finalizeThreads")
+ .doc("Specify the number of threads used by DAGScheduler to finalize
shuffle merge. " +
Review comment:
`Specify the number of` -> `Number of`
Also, in the doc string (here and elsewhere), replace `DAGScheduler` with
`driver`
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2193,6 +2193,29 @@ package object config {
// with small MB sized chunk of data.
.createWithDefaultString("3m")
+ private[spark] val PUSH_BASED_SHUFFLE_MERGE_FINALIZE_THREADS =
+ ConfigBuilder("spark.shuffle.push.merge.finalizeThreads")
+ .doc("Specify the number of threads used by DAGScheduler to finalize
shuffle merge. " +
+ "Since it could potentially take seconds for a large shuffle to
finalize, having " +
+ "multiple threads helps DAGScheduler to handle multiple concurrent
shuffle merge " +
Review comment:
`multiple concurrent` -> `concurrent`
##########
File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
##########
@@ -328,10 +339,30 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf)
extends Logging {
s"stop.")
return false
} else {
+ if (remainingBlocks.isEmpty && pushRequests.isEmpty &&
deferredPushRequests.isEmpty &&
+ !pushCompletionNotified) {
+ notifyDriverAboutPushCompletion()
+ pushCompletionNotified = true
+ }
remainingBlocks.isEmpty && (pushRequests.nonEmpty ||
deferredPushRequests.nonEmpty)
}
}
+ /**
+ * Notify the driver via the saved RPC endpoint about all the blocks
generated by the current
+ * map task having been pushed. This enables the DAGScheduler to finalize
shuffle merge as soon
+ * as sufficient map tasks have completed push instead of always waiting for
a fixed amount of
+ * time.
+ */
+ private def notifyDriverAboutPushCompletion(): Unit = {
Review comment:
Wrap this method within `if (! pushCompletionNotified)` and update
`pushCompletionNotified = true` at end.
##########
File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
##########
@@ -328,10 +339,30 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf)
extends Logging {
s"stop.")
return false
} else {
+ if (remainingBlocks.isEmpty && pushRequests.isEmpty &&
deferredPushRequests.isEmpty &&
+ !pushCompletionNotified) {
+ notifyDriverAboutPushCompletion()
+ pushCompletionNotified = true
Review comment:
The check for `pushCompletionNotified ` and update to
`pushCompletionNotified ` is better managed within
`notifyDriverAboutPushCompletion`.
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -106,6 +106,7 @@ private[spark] class CoarseGrainedExecutorBackend(
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
+ SparkEnv.driverRpcEndpoint = Some(ref)
Review comment:
Assuming above suggestion is fine, this will get removed.
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2023,71 +2070,140 @@ private[spark] class DAGScheduler(
}
/**
- * Schedules shuffle merge finalize.
+ *
+ * Schedules shuffle merge finalization.
+ *
+ * @param stage the stage to finalize shuffle merge
+ * @param delay how long to wait before finalizing shuffle merge
+ * @param registerMergeResults whether to wait for merge results before
scheduling the next stage
+ * @return whether the caller is able to schedule a finalize task
*/
- private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage):
Unit = {
- // TODO: SPARK-33701: Instead of waiting for a constant amount of time for
finalization
- // TODO: for all the stages, adaptively tune timeout for merge finalization
- logInfo(("%s (%s) scheduled for finalizing" +
- " shuffle merge in %s s").format(stage, stage.name,
shuffleMergeFinalizeWaitSec))
- shuffleMergeFinalizeScheduler.schedule(
- new Runnable {
- override def run(): Unit = finalizeShuffleMerge(stage)
- },
- shuffleMergeFinalizeWaitSec,
- TimeUnit.SECONDS
- )
+ private[scheduler] def scheduleShuffleMergeFinalize(
+ stage: ShuffleMapStage,
+ delay: Long,
+ registerMergeResults: Boolean = true): Boolean = {
+ val shuffleDep = stage.shuffleDep
+ val scheduledTask: Option[ScheduledFuture[_]] = shuffleDep.getFinalizeTask
+ scheduledTask match {
+ case Some(task) =>
+ // If we find an already scheduled task, check if the task has been
triggered yet.
+ // If it's already triggered, do nothing. Otherwise, cancel it and
schedule a new
+ // one for immediate execution. Note that we should get here only when
+ // handleShufflePushCompleted schedules a finalize task after the
shuffle map stage
+ // completed earlier and scheduled a task with default delay.
+ if (task.getDelay(TimeUnit.NANOSECONDS) > 0) {
+ task.cancel(false)
+ // The current task should be coming from
handleShufflePushCompleted, thus the
+ // delay should be 0 and registerMergeResults should be true.
+ assert(delay == 0 && registerMergeResults)
Review comment:
Move this out of the `if` condition ?
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2023,71 +2070,140 @@ private[spark] class DAGScheduler(
}
/**
- * Schedules shuffle merge finalize.
+ *
+ * Schedules shuffle merge finalization.
+ *
+ * @param stage the stage to finalize shuffle merge
+ * @param delay how long to wait before finalizing shuffle merge
+ * @param registerMergeResults whether to wait for merge results before
scheduling the next stage
+ * @return whether the caller is able to schedule a finalize task
*/
- private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage):
Unit = {
- // TODO: SPARK-33701: Instead of waiting for a constant amount of time for
finalization
- // TODO: for all the stages, adaptively tune timeout for merge finalization
- logInfo(("%s (%s) scheduled for finalizing" +
- " shuffle merge in %s s").format(stage, stage.name,
shuffleMergeFinalizeWaitSec))
- shuffleMergeFinalizeScheduler.schedule(
- new Runnable {
- override def run(): Unit = finalizeShuffleMerge(stage)
- },
- shuffleMergeFinalizeWaitSec,
- TimeUnit.SECONDS
- )
+ private[scheduler] def scheduleShuffleMergeFinalize(
+ stage: ShuffleMapStage,
+ delay: Long,
+ registerMergeResults: Boolean = true): Boolean = {
+ val shuffleDep = stage.shuffleDep
+ val scheduledTask: Option[ScheduledFuture[_]] = shuffleDep.getFinalizeTask
+ scheduledTask match {
+ case Some(task) =>
+ // If we find an already scheduled task, check if the task has been
triggered yet.
+ // If it's already triggered, do nothing. Otherwise, cancel it and
schedule a new
+ // one for immediate execution. Note that we should get here only when
+ // handleShufflePushCompleted schedules a finalize task after the
shuffle map stage
+ // completed earlier and scheduled a task with default delay.
+ if (task.getDelay(TimeUnit.NANOSECONDS) > 0) {
+ task.cancel(false)
+ // The current task should be coming from
handleShufflePushCompleted, thus the
+ // delay should be 0 and registerMergeResults should be true.
+ assert(delay == 0 && registerMergeResults)
+ logInfo(s"$stage (${stage.name}) scheduled for finalizing shuffle
merge immediately " +
+ s"after cancelling previously scheduled task.")
+ shuffleDep.setFinalizeTask(
+ shuffleMergeFinalizeScheduler.schedule(
+ new Runnable {
+ override def run(): Unit = finalizeShuffleMerge(stage,
registerMergeResults)
+ },
+ 0,
+ TimeUnit.SECONDS
+ )
+ )
+ true
+ } else {
+ false
Review comment:
log a message ?
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1716,7 +1739,31 @@ private[spark] class DAGScheduler(
if (runningStages.contains(shuffleStage) &&
shuffleStage.pendingPartitions.isEmpty) {
if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
- scheduleShuffleMergeFinalize(shuffleStage)
+ // Check if a finalize task has already been scheduled. This
is to prevent the
+ // following scenario: Stage A attempt 0 fails and gets
retried. Stage A attempt 1
+ // succeeded, triggering the scheduling of shuffle merge
finalization. However,
+ // tasks from Stage A attempt 0 might still be running and
sending task completion
+ // events to DAGScheduler. This check prevents multiple
attempts to schedule merge
+ // finalization get triggered due to this.
+ if (shuffleStage.shuffleDep.getFinalizeTask.isEmpty) {
+ // If total shuffle size is smaller than the threshold,
attempt to immediately
+ // schedule shuffle merge finalization and process map stage
completion.
+ val totalSize = Try(mapOutputTracker
+
.getStatistics(shuffleStage.shuffleDep).bytesByPartitionId.sum).getOrElse(0L)
+ if (totalSize < shuffleMergeWaitMinSizeThreshold) {
+ // Check if we can process map stage completion. If
shuffle merge finalization
+ // is already triggered because push completion ratio was
reached earlier,
+ // we cannot process map stage completion, but have to
wait for the finalization
+ // to finish. This is because it's not straightforward to
interrupt the
+ // finalization task and undo what it might have already
done.
+ if (scheduleShuffleMergeFinalize(shuffleStage, delay = 0,
+ registerMergeResults = false)) {
Review comment:
Also +CC @Ngone51 on this.
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2023,71 +2070,140 @@ private[spark] class DAGScheduler(
}
/**
- * Schedules shuffle merge finalize.
+ *
+ * Schedules shuffle merge finalization.
+ *
+ * @param stage the stage to finalize shuffle merge
+ * @param delay how long to wait before finalizing shuffle merge
+ * @param registerMergeResults whether to wait for merge results before
scheduling the next stage
Review comment:
Fix the doc for `registerMergeResults` (`finalizeShuffleMerge` has a
good description)
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1716,7 +1739,31 @@ private[spark] class DAGScheduler(
if (runningStages.contains(shuffleStage) &&
shuffleStage.pendingPartitions.isEmpty) {
if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
- scheduleShuffleMergeFinalize(shuffleStage)
+ // Check if a finalize task has already been scheduled. This
is to prevent the
+ // following scenario: Stage A attempt 0 fails and gets
retried. Stage A attempt 1
+ // succeeded, triggering the scheduling of shuffle merge
finalization. However,
+ // tasks from Stage A attempt 0 might still be running and
sending task completion
+ // events to DAGScheduler. This check prevents multiple
attempts to schedule merge
+ // finalization get triggered due to this.
+ if (shuffleStage.shuffleDep.getFinalizeTask.isEmpty) {
+ // If total shuffle size is smaller than the threshold,
attempt to immediately
+ // schedule shuffle merge finalization and process map stage
completion.
+ val totalSize = Try(mapOutputTracker
+
.getStatistics(shuffleStage.shuffleDep).bytesByPartitionId.sum).getOrElse(0L)
+ if (totalSize < shuffleMergeWaitMinSizeThreshold) {
+ // Check if we can process map stage completion. If
shuffle merge finalization
+ // is already triggered because push completion ratio was
reached earlier,
+ // we cannot process map stage completion, but have to
wait for the finalization
+ // to finish. This is because it's not straightforward to
interrupt the
+ // finalization task and undo what it might have already
done.
+ if (scheduleShuffleMergeFinalize(shuffleStage, delay = 0,
+ registerMergeResults = false)) {
+ handleShuffleMergeFinalized(shuffleStage)
Review comment:
In `handleShuffleMergeFinalized`, can we add a log message before
`MOT.unregisterAllMergeResult`.
Without the `task.cancel` check detailed below, we could have potentially
ended up with that issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]