Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15335#discussion_r82021050
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1255,27 +1255,46 @@ class DAGScheduler(
                   s"longer running")
               }
     
    -          if (disallowStageRetryForTest) {
    -            abortStage(failedStage, "Fetch failure will not retry stage 
due to testing config",
    -              None)
    -          } else if 
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
    -            abortStage(failedStage, s"$failedStage (${failedStage.name}) " 
+
    -              s"has failed the maximum allowable number of " +
    -              s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
    -              s"Most recent failure reason: ${failureMessage}", None)
    -          } else {
    -            if (failedStages.isEmpty) {
    -              // Don't schedule an event to resubmit failed stages if 
failed isn't empty, because
    -              // in that case the event will already have been scheduled.
    -              // TODO: Cancel running tasks in the stage
    -              logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
    -                s"$failedStage (${failedStage.name}) due to fetch failure")
    -              messageScheduler.schedule(new Runnable {
    -                override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
    -              }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
    +          val shouldAbortStage =
    +            failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
    +            disallowStageRetryForTest
    +
    +          if (shouldAbortStage) {
    +            val abortMessage = if (disallowStageRetryForTest) {
    +              "Fetch failure will not retry stage due to testing config"
    +            } else {
    +              s"""$failedStage (${failedStage.name})
    +                 |has failed the maximum allowable number of
    +                 |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
    +                 |Most recent failure reason: 
$failureMessage""".stripMargin.replaceAll("\n", " ")
                 }
    +            abortStage(failedStage, abortMessage, None)
    +          } else { // update failedStages and make sure a 
ResubmitFailedStages event is enqueued
    +            // TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
    +            val noResubmitEnqueued = !failedStages.contains(failedStage)
                 failedStages += failedStage
                 failedStages += mapStage
    +            if (noResubmitEnqueued) {
    +              // We expect one executor failure to trigger many 
FetchFailures in rapid succession,
    +              // but all of those task failures can typically be handled 
by a single resubmission of
    +              // the failed stage.  We avoid flooding the scheduler's 
event queue with resubmit
    +              // messages by checking whether a resubmit is already in the 
event queue for the
    +              // failed stage.  If there is already a resubmit enqueued 
for a different failed
    +              // stage, that event would also be sufficient to handle the 
current failed stage, but
    +              // producing a resubmit for each failed stage makes 
debugging and logging a little
    +              // simpler while not producing an overwhelming number of 
scheduler events.
    +              logInfo(
    +                s"Resubmitting $mapStage (${mapStage.name}) and " +
    +                s"$failedStage (${failedStage.name}) due to fetch failure"
    +              )
    +              messageScheduler.schedule(
    --- End diff --
    
    I find myself frequently wondering about the purpose of this.  Its 
commented very tersely on RESUBMIT_TIMEOUT, but I think it might be nice to add 
a longer comment here.  I guess something like
    
    "If we get one fetch-failure, we often get more fetch failures across 
multiple executors.  We will get better parallelism when we resubmit the 
mapStage if we can resubmit when we know about as many of those failures as 
possible.  So this is a heuristic to add a *small* delay to see if we gather a 
few more failures before we resubmit."
    
    We do *not* need the delay to figure out exactly which shuffle-map outputs 
are gone on the executor -- we always [mark the executor as lost on a fetch 
failure](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1288),
 which means we mark all its map output as gone.  (This is really confusing -- 
it *looks* like we only [remove the one shuffle-map output that was involved in 
the fetch 
failure](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1282),
 but then the entire removal is buried inside another method a few lines 
further.)
    
    I did some browsing through history, and there [used to be this 
comment](https://github.com/apache/spark/commit/63051dd2bcc4bf09d413ff7cf89a37967edc33ba#diff-de60e51a99fd49126cda541f8c1fb5aaR262)
    
    ```
    +      // Periodically resubmit failed stages if some map output fetches 
have failed and we have
     +      // waited at least RESUBMIT_TIMEOUT. We wait for this short time 
because when a node fails,
     +      // tasks on many other nodes are bound to get a fetch failure, and 
they won't all get it at
     +      // the same time, so we want to make sure we've identified all the 
reduce tasks that depend
     +      // on the failed node.
    ```
    
    at least in the current version, this also sounds like a bad reason to have 
the delay.  `failedStage` won't be resubmitted till `mapStage` completes 
anyway, and then it'll look to see what tasks it is missing.  Adding a tiny 
delay on top of the natural delay for `mapStage` seems pretty pointless.
    
    I don't even think that the reason I gave in my suggested comment is a good 
one -- do you really expect failures in multiple executors?  But it is at least 
logically consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to