mridulm commented on code in PR #36293:
URL: https://github.com/apache/spark/pull/36293#discussion_r854758006


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2337,20 +2337,31 @@ private[spark] class DAGScheduler(
     // adaptive attempt while the stage might have failed/killed and shuffle 
id is getting
     // re-executing now.
     if (stage.shuffleDep.shuffleMergeId == shuffleMergeId) {
-      if (stage.pendingPartitions.isEmpty) {
+      // When it reaches here, there is a possibility that the stage will be 
resubmitted again
+      // because of various reasons. Some of these could be:
+      // a) Stage results are not available. All the tasks completed once so 
the
+      // pendingPartitions is empty but due to an executor failure some of the 
map outputs are not
+      // available any more, so the stage will be re-submitted.
+      // b) Stage failed due to a task failure.
+      // We should mark the stage as merged finalized irrespective of what 
state it is in.
+      // This will prevent the push being enabled for the re-attempt.
+      // Note: for indeterminate stages, this doesn't matter at all, since the 
merge finalization
+      // related state is reset during the stage submission.
+      stage.shuffleDep.markShuffleMergeFinalized()
+      if (stage.pendingPartitions.isEmpty)
         if (runningStages.contains(stage)) {
-          stage.shuffleDep.markShuffleMergeFinalized()
           processShuffleMapStageCompletion(stage)
-        } else {
-          // Unregister all merge results if the stage is currently not
-          // active (i.e. the stage is cancelled)
+        } else if (stage.isIndeterminate) {
+          // There are 2 possibilities here - stage is either cancelled or it 
will be resubmitted.
+          // If this is an indeterminate stage which is cancelled, we 
unregister all its merge
+          // results here just to free up some memory. If the indeterminate 
stage is resubmitted,
+          // merge results are cleared again when the newer attempt is 
submitted.
           mapOutputTracker.unregisterAllMergeResult(stage.shuffleDep.shuffleId)
+          // For deterministic stages that are cancelled we would like to 
unregister all the merge
+          // results but we are unable to distinguish between a cancelled 
stage or whether it
+          // will be resubmitted here. In case the stage is not cancelled, the 
mergeResults of that
+          // stage will still be usable and valid.

Review Comment:
   For determinate stages, which have completed merge finalization, we dont 
need to unregister merge results - since the stage retry, or any other stage 
computing the same shuffle id, can use it.
   Can you update the description ?
   
   `For determinate stages, as the finalization has completed, the output can 
be consumed by children stages`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to