otterc commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r640363202
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2136,9 +2137,24 @@ private[spark] class DAGScheduler(
}
}
- private[scheduler] def handleShuffleMergeFinalized(stage: ShuffleMapStage):
Unit = {
- stage.shuffleDep.markShuffleMergeFinalized
- processShuffleMapStageCompletion(stage)
+ private[scheduler] def handleRegisterMergeStatuses(
+ stage: ShuffleMapStage,
+ mergeStatuses: Seq[(Int, MergeStatus)]): Unit = {
+ // Register merge statuses if the stage is still running and shuffle merge
is not finalized yet.
+ if (runningStages.contains(stage) &&
!stage.shuffleDep.shuffleMergeFinalized) {
+ mapOutputTracker.registerMergeResults(stage.shuffleDep.shuffleId,
mergeStatuses)
+ }
+ }
+
+ private[scheduler] def handleShuffleMergeFinalized(
+ stage: ShuffleMapStage): Unit = {
+ // Only update MapOutputTracker metadata if the stage is still active. i.e
not cancelled.
+ if (runningStages.contains(stage)) {
+ stage.shuffleDep.markShuffleMergeFinalized()
+ processShuffleMapStageCompletion(stage)
+ } else {
+ mapOutputTracker.unregisterAllMergeResult(stage.shuffleDep.shuffleId)
Review comment:
@mridulm For the case that you pointed out
> stage resubmission after a cancellation after initiating merge finalization
The problem that we were registering the mergeResults without a check and
that would interfere with resubmission is solved here by not registering merge
results if the stage is not running. Also, it makes sense to finalize the stage
when it is running.
However, shouldn't unregistering of merge results happen when there is a
fetch failure or a stage is cancelled?
Both of the above are not part of this PR. So, I just want to understand in
the context of this PR, why are we unregistering the merge results in the else
part here and what does that help with?
```
else {
mapOutputTracker.unregisterAllMergeResult(stage.shuffleDep.shuffleId)
```
Also if there is a good reason to do it, then we should document it here as
a comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]