Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/15335#discussion_r82931294
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1255,27 +1255,46 @@ class DAGScheduler(
s"longer running")
}
- if (disallowStageRetryForTest) {
- abortStage(failedStage, "Fetch failure will not retry stage
due to testing config",
- None)
- } else if
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
- abortStage(failedStage, s"$failedStage (${failedStage.name}) "
+
- s"has failed the maximum allowable number of " +
- s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
- s"Most recent failure reason: ${failureMessage}", None)
- } else {
- if (failedStages.isEmpty) {
- // Don't schedule an event to resubmit failed stages if
failed isn't empty, because
- // in that case the event will already have been scheduled.
- // TODO: Cancel running tasks in the stage
- logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
- s"$failedStage (${failedStage.name}) due to fetch failure")
- messageScheduler.schedule(new Runnable {
- override def run(): Unit =
eventProcessLoop.post(ResubmitFailedStages)
- }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+ val shouldAbortStage =
+ failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
+ disallowStageRetryForTest
+
+ if (shouldAbortStage) {
+ val abortMessage = if (disallowStageRetryForTest) {
+ "Fetch failure will not retry stage due to testing config"
+ } else {
+ s"""$failedStage (${failedStage.name})
+ |has failed the maximum allowable number of
+ |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
+ |Most recent failure reason:
$failureMessage""".stripMargin.replaceAll("\n", " ")
}
+ abortStage(failedStage, abortMessage, None)
+ } else { // update failedStages and make sure a
ResubmitFailedStages event is enqueued
+ // TODO: Cancel running tasks in the failed stage -- cf.
SPARK-17064
+ val noResubmitEnqueued = !failedStages.contains(failedStage)
failedStages += failedStage
failedStages += mapStage
+ if (noResubmitEnqueued) {
+ // We expect one executor failure to trigger many
FetchFailures in rapid succession,
+ // but all of those task failures can typically be handled
by a single resubmission of
+ // the failed stage. We avoid flooding the scheduler's
event queue with resubmit
+ // messages by checking whether a resubmit is already in the
event queue for the
+ // failed stage. If there is already a resubmit enqueued
for a different failed
+ // stage, that event would also be sufficient to handle the
current failed stage, but
+ // producing a resubmit for each failed stage makes
debugging and logging a little
+ // simpler while not producing an overwhelming number of
scheduler events.
+ logInfo(
+ s"Resubmitting $mapStage (${mapStage.name}) and " +
+ s"$failedStage (${failedStage.name}) due to fetch failure"
+ )
+ messageScheduler.schedule(
--- End diff --
yeah probably a separate PR, sorry this was just an opportunity for me to
rant :)
And sorry if I worded it poorly, but I was not suggesting the one w/
"Periodically" as a better comment -- in fact I think its a *bad* comment, just
wanted to mention it was another description which used to be there long ago.
This was my suggestion:
```
If we get one fetch-failure, we often get more fetch failures across
multiple executors. We will get better parallelism when we resubmit the
mapStage if we can resubmit when we know about as many of those failures as
possible. So this is a heuristic to add a small delay to see if we gather a few
more failures before we resubmit.
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]