Github user kayousterhout commented on a diff in the pull request:
https://github.com/apache/spark/pull/6750#discussion_r34732650
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1128,38 +1128,47 @@ class DAGScheduler(
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleToMapStage(shuffleId)
- // It is likely that we receive multiple FetchFailed for a single
stage (because we have
- // multiple tasks running concurrently on different executors). In
that case, it is possible
- // the fetch failure has already been handled by the scheduler.
- if (runningStages.contains(failedStage)) {
- logInfo(s"Marking $failedStage (${failedStage.name}) as failed "
+
- s"due to a fetch failure from $mapStage (${mapStage.name})")
- markStageAsFinished(failedStage, Some(failureMessage))
- }
+ if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
+ logInfo(s"Ignoring fetch failure from $task as it's from
$failedStage attempt" +
+ s" ${task.stageAttemptId}, which has already failed")
+ } else {
- if (disallowStageRetryForTest) {
- abortStage(failedStage, "Fetch failure will not retry stage due
to testing config")
- } else if (failedStages.isEmpty) {
- // Don't schedule an event to resubmit failed stages if failed
isn't empty, because
- // in that case the event will already have been scheduled.
- // TODO: Cancel running tasks in the stage
- logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
- s"$failedStage (${failedStage.name}) due to fetch failure")
- messageScheduler.schedule(new Runnable {
- override def run(): Unit =
eventProcessLoop.post(ResubmitFailedStages)
- }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
- }
- failedStages += failedStage
- failedStages += mapStage
- // Mark the map whose fetch failed as broken in the map stage
- if (mapId != -1) {
- mapStage.removeOutputLoc(mapId, bmAddress)
- mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
- }
+ // It is likely that we receive multiple FetchFailed for a
single stage (because we have
+ // multiple tasks running concurrently on different executors).
In that case, it is
+ // possible the fetch failure has already been handled by the
scheduler.
+ if (runningStages.contains(failedStage)) {
+ logInfo(s"Marking $failedStage (${failedStage.name}) as failed
" +
+ s"due to a fetch failure from $mapStage (${mapStage.name})")
+ markStageAsFinished(failedStage, Some(failureMessage))
+ } else {
+ logInfo(s"Ignoring fetch failure from $task as it's from
$failedStage, " +
--- End diff --
It seems like, even when the failed stage is no longer running, we don't
fully ignore the failure, because we remove the map id from the set of output
locations (on line 1164)? As a result, I'm not sure this log message makes
sense.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]