mridulm commented on code in PR #36293:
URL: https://github.com/apache/spark/pull/36293#discussion_r854758006
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2337,20 +2337,31 @@ private[spark] class DAGScheduler(
// adaptive attempt while the stage might have failed/killed and shuffle
id is getting
// re-executing now.
if (stage.shuffleDep.shuffleMergeId == shuffleMergeId) {
- if (stage.pendingPartitions.isEmpty) {
+ // When it reaches here, there is a possibility that the stage will be
resubmitted again
+ // because of various reasons. Some of these could be:
+ // a) Stage results are not available. All the tasks completed once so
the
+ // pendingPartitions is empty but due to an executor failure some of the
map outputs are not
+ // available any more, so the stage will be re-submitted.
+ // b) Stage failed due to a task failure.
+ // We should mark the stage as merged finalized irrespective of what
state it is in.
+ // This will prevent the push being enabled for the re-attempt.
+ // Note: for indeterminate stages, this doesn't matter at all, since the
merge finalization
+ // related state is reset during the stage submission.
+ stage.shuffleDep.markShuffleMergeFinalized()
+ if (stage.pendingPartitions.isEmpty)
if (runningStages.contains(stage)) {
- stage.shuffleDep.markShuffleMergeFinalized()
processShuffleMapStageCompletion(stage)
- } else {
- // Unregister all merge results if the stage is currently not
- // active (i.e. the stage is cancelled)
+ } else if (stage.isIndeterminate) {
+ // There are 2 possibilities here - stage is either cancelled or it
will be resubmitted.
+ // If this is an indeterminate stage which is cancelled, we
unregister all its merge
+ // results here just to free up some memory. If the indeterminate
stage is resubmitted,
+ // merge results are cleared again when the newer attempt is
submitted.
mapOutputTracker.unregisterAllMergeResult(stage.shuffleDep.shuffleId)
+ // For deterministic stages that are cancelled we would like to
unregister all the merge
+ // results but we are unable to distinguish between a cancelled
stage or whether it
+ // will be resubmitted here. In case the stage is not cancelled, the
mergeResults of that
+ // stage will still be usable and valid.
Review Comment:
For determinate stages, which have completed merge finalization, we dont
need to unregister merge results - since the stage retry, or any other stage
computing the same shuffle id, can use it.
Can you update the description ?
`For determinate stages, as the finalization has completed, the output can
be consumed by children stages`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]