Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6750#discussion_r34858783
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1128,38 +1128,47 @@ class DAGScheduler(
             val failedStage = stageIdToStage(task.stageId)
             val mapStage = shuffleToMapStage(shuffleId)
     
    -        // It is likely that we receive multiple FetchFailed for a single 
stage (because we have
    -        // multiple tasks running concurrently on different executors). In 
that case, it is possible
    -        // the fetch failure has already been handled by the scheduler.
    -        if (runningStages.contains(failedStage)) {
    -          logInfo(s"Marking $failedStage (${failedStage.name}) as failed " 
+
    -            s"due to a fetch failure from $mapStage (${mapStage.name})")
    -          markStageAsFinished(failedStage, Some(failureMessage))
    -        }
    +        if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
    +          logInfo(s"Ignoring fetch failure from $task as it's from 
$failedStage attempt" +
    +            s" ${task.stageAttemptId}, which has already failed")
    +        } else {
     
    -        if (disallowStageRetryForTest) {
    -          abortStage(failedStage, "Fetch failure will not retry stage due 
to testing config")
    -        } else if (failedStages.isEmpty) {
    -          // Don't schedule an event to resubmit failed stages if failed 
isn't empty, because
    -          // in that case the event will already have been scheduled.
    -          // TODO: Cancel running tasks in the stage
    -          logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
    -            s"$failedStage (${failedStage.name}) due to fetch failure")
    -          messageScheduler.schedule(new Runnable {
    -            override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
    -          }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
    -        }
    -        failedStages += failedStage
    -        failedStages += mapStage
    -        // Mark the map whose fetch failed as broken in the map stage
    -        if (mapId != -1) {
    -          mapStage.removeOutputLoc(mapId, bmAddress)
    -          mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
    -        }
    +          // It is likely that we receive multiple FetchFailed for a 
single stage (because we have
    +          // multiple tasks running concurrently on different executors). 
In that case, it is
    +          // possible the fetch failure has already been handled by the 
scheduler.
    +          if (runningStages.contains(failedStage)) {
    +            logInfo(s"Marking $failedStage (${failedStage.name}) as failed 
" +
    +              s"due to a fetch failure from $mapStage (${mapStage.name})")
    +            markStageAsFinished(failedStage, Some(failureMessage))
    +          } else {
    +            logInfo(s"Ignoring fetch failure from $task as it's from 
$failedStage, " +
    --- End diff --
    
    Doing a logDebug seems fine if you think it will be useful! (I never turn 
on debug level logging just because Spark's debug logging is sooo verbose, but 
I can imagine that others probably do turn it on!)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to