Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1877#discussion_r17510561
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1046,41 +1046,37 @@ class DAGScheduler(
     
           case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
             val failedStage = stageIdToStage(task.stageId)
    -        val mapStage = shuffleToMapStage(shuffleId)
     
             // It is likely that we receive multiple FetchFailed for a single 
stage (because we have
             // multiple tasks running concurrently on different executors). In 
that case, it is possible
             // the fetch failure has already been handled by the scheduler.
    -        if (runningStages.contains(failedStage)) {
    +        if (runningStages.contains(failedStage) && 
stage.pendingTasks.contains(task)) {
    +          val mapStage = shuffleToMapStage(shuffleId)
               logInfo(s"Marking $failedStage (${failedStage.name}) as failed " 
+
                 s"due to a fetch failure from $mapStage (${mapStage.name})")
    -          markStageAsFinished(failedStage, Some("Fetch failure"))
    -          runningStages -= failedStage
    -        }
    -
    -        if (failedStages.isEmpty && eventProcessActor != null) {
    -          // Don't schedule an event to resubmit failed stages if failed 
isn't empty, because
    -          // in that case the event will already have been scheduled. 
eventProcessActor may be
    -          // null during unit tests.
               // TODO: Cancel running tasks in the stage
    -          import env.actorSystem.dispatcher
    -          logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
    -            s"$failedStage (${failedStage.name}) due to fetch failure")
    -          env.actorSystem.scheduler.scheduleOnce(
    -            RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
    -        }
    -        failedStages += failedStage
    -        failedStages += mapStage
    +          markStageAsFinished(failedStage, Some("Fetch failure"))
    +          if (eventProcessActor != null) {
    +            // eventProcessActor may be null during unit tests.
    +            import env.actorSystem.dispatcher
    +            logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
    +              s"$failedStage (${failedStage.name}) due to fetch failure")
    +            env.actorSystem.scheduler.scheduleOnce(
    +              RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
    +          }
    +          failedStages += failedStage
    +          failedStages += mapStage
     
    -        // Mark the map whose fetch failed as broken in the map stage
    -        if (mapId != -1) {
    -          mapStage.removeOutputLoc(mapId, bmAddress)
    -          mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
    -        }
    +          // Mark the map whose fetch failed as broken in the map stage
    +          if (mapId != -1) {
    +            mapStage.removeOutputLoc(mapId, bmAddress)
    +            mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
    +          }
     
    -        // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
    -        if (bmAddress != null) {
    -          handleExecutorLost(bmAddress.executorId, Some(task.epoch))
    +          // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
    +          if (bmAddress != null) {
    --- End diff --
    
    @rxin  Yes, here is unnecessary modifications to processing logic, I 
negligence.
    是的,这里处理逻辑被不必要的修改了,疏忽了.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to