Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/15335#discussion_r82010161
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1255,27 +1255,46 @@ class DAGScheduler(
s"longer running")
}
- if (disallowStageRetryForTest) {
- abortStage(failedStage, "Fetch failure will not retry stage
due to testing config",
- None)
- } else if
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
- abortStage(failedStage, s"$failedStage (${failedStage.name}) "
+
- s"has failed the maximum allowable number of " +
- s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
- s"Most recent failure reason: ${failureMessage}", None)
- } else {
- if (failedStages.isEmpty) {
- // Don't schedule an event to resubmit failed stages if
failed isn't empty, because
- // in that case the event will already have been scheduled.
- // TODO: Cancel running tasks in the stage
- logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
- s"$failedStage (${failedStage.name}) due to fetch failure")
- messageScheduler.schedule(new Runnable {
- override def run(): Unit =
eventProcessLoop.post(ResubmitFailedStages)
- }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+ val shouldAbortStage =
+ failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
+ disallowStageRetryForTest
+
+ if (shouldAbortStage) {
+ val abortMessage = if (disallowStageRetryForTest) {
+ "Fetch failure will not retry stage due to testing config"
+ } else {
+ s"""$failedStage (${failedStage.name})
+ |has failed the maximum allowable number of
+ |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
+ |Most recent failure reason:
$failureMessage""".stripMargin.replaceAll("\n", " ")
}
+ abortStage(failedStage, abortMessage, None)
+ } else { // update failedStages and make sure a
ResubmitFailedStages event is enqueued
+ // TODO: Cancel running tasks in the failed stage -- cf.
SPARK-17064
+ val noResubmitEnqueued = !failedStages.contains(failedStage)
--- End diff --
name is slightly misleading, since there might be a Resubmit enqueued for
another stage. Maybe `noResubmitEnqueuedForThisStage`? but perhaps just
longer without adding any clarity -- the comment below explains it pretty well
so I'm ok w/ the name, just thinking aloud.
Also, I was weondering whether this should be
`!failedStages.contains(failedStage) || !failedStages.contains(mapStage)`. Is
there any scenario where it would contains `failedStage` but not `mapStage`? I
couldn't come up with anything, but also wonder if there are enough weird
scenarios we easily overlook that it might be better to keep it in just in case.
We could also try to send even fewer Resubmit events -- if the stage is
already in `waitingStages`, we don't need to resubmit. But I think I'd prefer
to not go that far, since its always safe to over-Resubmit, and worried we may
overlook a case.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]