Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/1877#discussion_r17510561 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1046,41 +1046,37 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => val failedStage = stageIdToStage(task.stageId) - val mapStage = shuffleToMapStage(shuffleId) // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is possible // the fetch failure has already been handled by the scheduler. - if (runningStages.contains(failedStage)) { + if (runningStages.contains(failedStage) && stage.pendingTasks.contains(task)) { + val mapStage = shuffleToMapStage(shuffleId) logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + s"due to a fetch failure from $mapStage (${mapStage.name})") - markStageAsFinished(failedStage, Some("Fetch failure")) - runningStages -= failedStage - } - - if (failedStages.isEmpty && eventProcessActor != null) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. eventProcessActor may be - // null during unit tests. // TODO: Cancel running tasks in the stage - import env.actorSystem.dispatcher - logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + - s"$failedStage (${failedStage.name}) due to fetch failure") - env.actorSystem.scheduler.scheduleOnce( - RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages) - } - failedStages += failedStage - failedStages += mapStage + markStageAsFinished(failedStage, Some("Fetch failure")) + if (eventProcessActor != null) { + // eventProcessActor may be null during unit tests. + import env.actorSystem.dispatcher + logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + + s"$failedStage (${failedStage.name}) due to fetch failure") + env.actorSystem.scheduler.scheduleOnce( + RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages) + } + failedStages += failedStage + failedStages += mapStage - // Mark the map whose fetch failed as broken in the map stage - if (mapId != -1) { - mapStage.removeOutputLoc(mapId, bmAddress) - mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - } + // Mark the map whose fetch failed as broken in the map stage + if (mapId != -1) { + mapStage.removeOutputLoc(mapId, bmAddress) + mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) + } - // TODO: mark the executor as failed only if there were lots of fetch failures on it - if (bmAddress != null) { - handleExecutorLost(bmAddress.executorId, Some(task.epoch)) + // TODO: mark the executor as failed only if there were lots of fetch failures on it + if (bmAddress != null) { --- End diff -- @rxin Yes, here is unnecessary modifications to processing logic, I negligence. æ¯ç,è¿éå¤çé»è¾è¢«ä¸å¿ è¦çä¿®æ¹äº,ç忽äº.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org