Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/6750#discussion_r34854409
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1128,38 +1128,47 @@ class DAGScheduler(
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleToMapStage(shuffleId)
- // It is likely that we receive multiple FetchFailed for a single
stage (because we have
- // multiple tasks running concurrently on different executors). In
that case, it is possible
- // the fetch failure has already been handled by the scheduler.
- if (runningStages.contains(failedStage)) {
- logInfo(s"Marking $failedStage (${failedStage.name}) as failed "
+
- s"due to a fetch failure from $mapStage (${mapStage.name})")
- markStageAsFinished(failedStage, Some(failureMessage))
- }
+ if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
+ logInfo(s"Ignoring fetch failure from $task as it's from
$failedStage attempt" +
+ s" ${task.stageAttemptId}, which has already failed")
+ } else {
- if (disallowStageRetryForTest) {
- abortStage(failedStage, "Fetch failure will not retry stage due
to testing config")
- } else if (failedStages.isEmpty) {
- // Don't schedule an event to resubmit failed stages if failed
isn't empty, because
- // in that case the event will already have been scheduled.
- // TODO: Cancel running tasks in the stage
- logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
- s"$failedStage (${failedStage.name}) due to fetch failure")
- messageScheduler.schedule(new Runnable {
- override def run(): Unit =
eventProcessLoop.post(ResubmitFailedStages)
- }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
- }
- failedStages += failedStage
- failedStages += mapStage
- // Mark the map whose fetch failed as broken in the map stage
- if (mapId != -1) {
- mapStage.removeOutputLoc(mapId, bmAddress)
- mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
- }
+ // It is likely that we receive multiple FetchFailed for a
single stage (because we have
+ // multiple tasks running concurrently on different executors).
In that case, it is
+ // possible the fetch failure has already been handled by the
scheduler.
+ if (runningStages.contains(failedStage)) {
+ logInfo(s"Marking $failedStage (${failedStage.name}) as failed
" +
+ s"due to a fetch failure from $mapStage (${mapStage.name})")
+ markStageAsFinished(failedStage, Some(failureMessage))
+ } else {
+ logInfo(s"Ignoring fetch failure from $task as it's from
$failedStage, " +
--- End diff --
yes, good point ... perhaps we should just get rid of this msg, I'm not
sure if there is anything we could say here that is succint, accurate, and
still useful. I think I added this msg during some early debugging. Do you
think its worth logging "Received fetch failure from $task, but its from
$failedStage which is no longer running"? Maybe a `logDebug`?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]