Victsm commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r647734178



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1678,38 +1717,16 @@ private[spark] class DAGScheduler(
             }
 
             if (runningStages.contains(shuffleStage) && 
shuffleStage.pendingPartitions.isEmpty) {
-              markStageAsFinished(shuffleStage)
-              logInfo("looking for newly runnable stages")
-              logInfo("running: " + runningStages)
-              logInfo("waiting: " + waitingStages)
-              logInfo("failed: " + failedStages)
-
-              // This call to increment the epoch may not be strictly 
necessary, but it is retained
-              // for now in order to minimize the changes in behavior from an 
earlier version of the
-              // code. This existing behavior of always incrementing the epoch 
following any
-              // successful shuffle map stage completion may have benefits by 
causing unneeded
-              // cached map outputs to be cleaned up earlier on executors. In 
the future we can
-              // consider removing this call, but this will require some extra 
investigation.
-              // See 
https://github.com/apache/spark/pull/17955/files#r117385673 for more details.
-              mapOutputTracker.incrementEpoch()
-
-              clearCacheLocs()
-
-              if (!shuffleStage.isAvailable) {
-                // Some tasks had failed; let's resubmit this shuffleStage.
-                // TODO: Lower-level scheduler should also deal with this
-                logInfo("Resubmitting " + shuffleStage + " (" + 
shuffleStage.name +
-                  ") because some of its tasks had failed: " +
-                  shuffleStage.findMissingPartitions().mkString(", "))
-                submitStage(shuffleStage)
+              if (!shuffleStage.isMergeFinalized &&

Review comment:
       The original condition is:
   `shuffleStage.shuffleDep.shuffleMergeEnabled && 
!shuffleStage.shuffleDep.shuffleMergeFinalized && shuffleStage.isAvailable`.
   Here, the 2 checks are equivalent to the first 2 checks in the original 
condition, but why are we removing the third one?
   It could be possible that by the time all tasks finish for a shuffle map 
stage, some task results are already lost.
   In this case, we should invoke `processShuffleMapStageCompletion` to 
properly handle stage retry instead of scheduling merge finalization.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to