Ngone51 commented on a change in pull request #30716:
URL: https://github.com/apache/spark/pull/30716#discussion_r549996831



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -251,6 +251,26 @@ private[spark] class DAGScheduler(
 
   private val pushBasedShuffleEnabled = 
Utils.isPushBasedShuffleEnabled(sc.getConf)
 
+  private def shouldUnregisterMapOutput(mapStage: Stage, fetchFailed: 
FetchFailed,
+    stageId: Int, stageAttemptId: Int): Boolean = {
+    if (!runningStages.contains(mapStage)) {
+      true
+    } else if (taskScheduler == null) {
+      true
+    } else {
+      val latestInfo = mapStage.latestInfo
+      if (latestInfo == null) {
+        true
+      } else {
+        taskScheduler.taskSetManagerForAttempt(
+          latestInfo.stageId, latestInfo.attemptNumber()) match {
+          case Some(tsm: TaskSetManager) => 
!tsm.hasPartitionId(fetchFailed.mapId.toInt)
+          case _ => true

Review comment:
       In the corner case, e.g., the rerun stage may only need to rerun one 
task and the task finished before this FetchFailure...which means the rerun 
stage attempt could be removed from `taskSetManagerForAttempt`...then, I don't 
think we should return `true` here..

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -251,6 +251,26 @@ private[spark] class DAGScheduler(
 
   private val pushBasedShuffleEnabled = 
Utils.isPushBasedShuffleEnabled(sc.getConf)
 
+  private def shouldUnregisterMapOutput(mapStage: Stage, fetchFailed: 
FetchFailed,
+    stageId: Int, stageAttemptId: Int): Boolean = {
+    if (!runningStages.contains(mapStage)) {
+      true
+    } else if (taskScheduler == null) {
+      true
+    } else {
+      val latestInfo = mapStage.latestInfo
+      if (latestInfo == null) {
+        true
+      } else {
+        taskScheduler.taskSetManagerForAttempt(
+          latestInfo.stageId, latestInfo.attemptNumber()) match {
+          case Some(tsm: TaskSetManager) => 
!tsm.hasPartitionId(fetchFailed.mapId.toInt)

Review comment:
       I think you actually want `fetchFailed.mapIndex` rather than 
`fetchFailed.mapId`?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1731,8 +1751,14 @@ private[spark] class DAGScheduler(
             // resubmitted stage attempt.
             mapOutputTracker.unregisterAllMapOutput(shuffleId)
           } else if (mapIndex != -1) {
-            // Mark the map whose fetch failed as broken in the map stage
-            mapOutputTracker.unregisterMapOutput(shuffleId, mapIndex, 
bmAddress)

Review comment:
       Probably, comparing the `mapId` of the MapStatus should be enough for 
this case. But it should only work when we use the new shuffle fetch protocol 
(enabled by default). For example,
   
   
   ```scala
   val mapStatus = mapOutputTracker.getMapOutput(shuffleId, mapIndex)
   if (mapStatus.mapId == fetchFailed.mapId) {
     mapOutputTracker.unregisterMapOutput(shuffleId, mapIndex, bmAddress)
   }
   ```
   
   For the old protocol, I think we can at least compare `bmAddress` to prevent 
most cases. Although, it still cannot fix the case where `bmAddress` is the 
same.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to