Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15335#discussion_r82933318
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1255,27 +1255,46 @@ class DAGScheduler(
                   s"longer running")
               }
     
    -          if (disallowStageRetryForTest) {
    -            abortStage(failedStage, "Fetch failure will not retry stage 
due to testing config",
    -              None)
    -          } else if 
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
    -            abortStage(failedStage, s"$failedStage (${failedStage.name}) " 
+
    -              s"has failed the maximum allowable number of " +
    -              s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
    -              s"Most recent failure reason: ${failureMessage}", None)
    -          } else {
    -            if (failedStages.isEmpty) {
    -              // Don't schedule an event to resubmit failed stages if 
failed isn't empty, because
    -              // in that case the event will already have been scheduled.
    -              // TODO: Cancel running tasks in the stage
    -              logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
    -                s"$failedStage (${failedStage.name}) due to fetch failure")
    -              messageScheduler.schedule(new Runnable {
    -                override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
    -              }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
    +          val shouldAbortStage =
    +            failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
    +            disallowStageRetryForTest
    +
    +          if (shouldAbortStage) {
    +            val abortMessage = if (disallowStageRetryForTest) {
    +              "Fetch failure will not retry stage due to testing config"
    +            } else {
    +              s"""$failedStage (${failedStage.name})
    +                 |has failed the maximum allowable number of
    +                 |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
    +                 |Most recent failure reason: 
$failureMessage""".stripMargin.replaceAll("\n", " ")
                 }
    +            abortStage(failedStage, abortMessage, None)
    +          } else { // update failedStages and make sure a 
ResubmitFailedStages event is enqueued
    +            // TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
    +            val noResubmitEnqueued = !failedStages.contains(failedStage)
    --- End diff --
    
    I think I was worried about the opposite problem -- perhaps we add 
`mapStage` to `failedStages`, but fail to fire a `Resubmit` event.  Maybe too 
many negatives to think through this clearly -- my intention was *more* logging 
& resubmission, not less.  I suppose I was thinking of it as:
    
    ```scala
    val addedToFailedStages = failedStages.add(failedStage) | 
failedStages.add(mapStage)
    if (addedToFailedStage) {
      logStuff()
      resubmit()
    }
    ```
    
    the point being, to avoid another case of the bug which started this all -- 
you add to `failedStages`, but fail to ever `Resubmit`.
    
    I was thinking of something more like this (though as you'll see, this case 
is fine).  Say you have two jobs submitted concurrently, which share the first 
few stages.  A -> B -> C and A -> B -> D.   There is an executor failure while 
they are both running their independent parts, C & D, concurrently.  The 
failure is detected in C first, so it marks B & C as failed.  Later on, the 
failure is detected in D, it marks B & D as failed.  If the first resubmit was 
already processed, its fine, B is already running, and we mark D as waiting on 
D.  Similarly, its fine if the resubmit wasn't processed yet when the failure 
is detected in D-- then when the resubmit is processed, we resubmit all 3 
stages.
    
    I think it also works out even if stage A needs to get resubmitted as well 
-- its handled in the same call that does the resubmit for B, when it checks 
for missing parents.  (In fact, thinking through these cases makes me think we 
don't even need to resubmit the `mapStage` at all -- the `failedStage` will 
submit itself on its resubmit, since it will notice its parents aren't ready.  
Which is why there isn't a case where this check would really mater.)
    
    Anyway, the point is not that I could show you of a case were we *do* need 
to make sure there is a resubmit.  The point is that I'm *not* sure that we do 
*not* need it, which is why I thought it was better to err on the side of 
over-logging / resubmitting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to