otterc commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r639138475
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2060,17 +2060,18 @@ private[spark] class DAGScheduler(
stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
case (shuffleServiceLoc, index) =>
// Sends async request to shuffle service to finalize shuffle merge
on that host
+ // TODO: Cancel finalizeShuffleMerge if the stage is cancelled
Review comment:
Can you create a jira for this TODO?
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2136,9 +2137,24 @@ private[spark] class DAGScheduler(
}
}
- private[scheduler] def handleShuffleMergeFinalized(stage: ShuffleMapStage):
Unit = {
- stage.shuffleDep.markShuffleMergeFinalized
- processShuffleMapStageCompletion(stage)
+ private[scheduler] def handleRegisterMergeStatuses(
+ stage: ShuffleMapStage,
+ mergeStatuses: Seq[(Int, MergeStatus)]): Unit = {
+ // Register merge statuses if the stage is still running and shuffle merge
is not finalized yet.
+ if (runningStages.contains(stage) &&
!stage.shuffleDep.shuffleMergeFinalized) {
+ mapOutputTracker.registerMergeResults(stage.shuffleDep.shuffleId,
mergeStatuses)
+ }
+ }
+
+ private[scheduler] def handleShuffleMergeFinalized(
+ stage: ShuffleMapStage): Unit = {
+ // Only update MapOutputTracker metadata if the stage is still active. i.e
not cancelled.
+ if (runningStages.contains(stage)) {
+ stage.shuffleDep.markShuffleMergeFinalized()
+ processShuffleMapStageCompletion(stage)
+ } else {
+ mapOutputTracker.unregisterAllMergeResult(stage.shuffleDep.shuffleId)
Review comment:
If the stage is not part of runningStages does it really mean that it
was cancelled?
It seems like runningStages contain active stages. If a stage is completed
successfully, it will not be part of runningStages. Why do we unregister all
the mergeResults here ?
Also please add a comment that why we need to unregister all the merge
results here
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2136,9 +2137,24 @@ private[spark] class DAGScheduler(
}
}
- private[scheduler] def handleShuffleMergeFinalized(stage: ShuffleMapStage):
Unit = {
- stage.shuffleDep.markShuffleMergeFinalized
- processShuffleMapStageCompletion(stage)
+ private[scheduler] def handleRegisterMergeStatuses(
+ stage: ShuffleMapStage,
+ mergeStatuses: Seq[(Int, MergeStatus)]): Unit = {
+ // Register merge statuses if the stage is still running and shuffle merge
is not finalized yet.
+ if (runningStages.contains(stage) &&
!stage.shuffleDep.shuffleMergeFinalized) {
Review comment:
@mridulm mentioned that one of the benefits of this way is that we can
still accept and register merge statuses which are received after the stage
gets finalized.
But here, we are ignoring these statuses if shuffle merge is finalized. Why?
In case of deterministic stage retries, accepting merge statuses after
finalize is still okay, isn't it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]