Github user markhamstra commented on a diff in the pull request:
https://github.com/apache/spark/pull/15335#discussion_r82864060
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1255,27 +1255,46 @@ class DAGScheduler(
s"longer running")
}
- if (disallowStageRetryForTest) {
- abortStage(failedStage, "Fetch failure will not retry stage
due to testing config",
- None)
- } else if
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
- abortStage(failedStage, s"$failedStage (${failedStage.name}) "
+
- s"has failed the maximum allowable number of " +
- s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
- s"Most recent failure reason: ${failureMessage}", None)
- } else {
- if (failedStages.isEmpty) {
- // Don't schedule an event to resubmit failed stages if
failed isn't empty, because
- // in that case the event will already have been scheduled.
- // TODO: Cancel running tasks in the stage
- logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
- s"$failedStage (${failedStage.name}) due to fetch failure")
- messageScheduler.schedule(new Runnable {
- override def run(): Unit =
eventProcessLoop.post(ResubmitFailedStages)
- }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+ val shouldAbortStage =
+ failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
+ disallowStageRetryForTest
+
+ if (shouldAbortStage) {
+ val abortMessage = if (disallowStageRetryForTest) {
+ "Fetch failure will not retry stage due to testing config"
+ } else {
+ s"""$failedStage (${failedStage.name})
+ |has failed the maximum allowable number of
+ |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
+ |Most recent failure reason:
$failureMessage""".stripMargin.replaceAll("\n", " ")
}
+ abortStage(failedStage, abortMessage, None)
+ } else { // update failedStages and make sure a
ResubmitFailedStages event is enqueued
+ // TODO: Cancel running tasks in the failed stage -- cf.
SPARK-17064
+ val noResubmitEnqueued = !failedStages.contains(failedStage)
--- End diff --
Right here is the only place we put anything into `failedStages`, so
`failedStage` and `mapStage` always go in as pairs. The only places where we
remove things from `failedStages` are `resubmitFailedStages` and
`DAGScheduler#cleanupStateForJobAndIndependentStages#removeStage`. We clear
`failedStages` in `resubmitFailedStages`, so the only place where `failedStage`
and `mapStage` could get unpaired in `failedStages` is in
`cleanupStateForJobAndIndependentStages#removeStage`. That would happen if the
number of Jobs that use `failedStage` and `mapStage` is unequal. If I'm
thinking correctly, that could only happen if the `mapStage` is used by more
Jobs than is the `failedStage`. In that case, cleaning up the last Job that
uses `failedStage` would remove `failedStage` from `failedStages` while
`mapStage` would remain.
To fall into your proposed `|| !failedStages.contains(mapStage)` branch,
another `failedStage` needing `mapStage`, this time coming from one of the
remaining Jobs using `mapStage`, would need to fail. If that is the case, then
we still want to log the failure of the new `failedStage`, so I don't think we
want `|| !failedStages.contains(mapStage)` -- without it, we'll get a duplicate
of `mapStage` added to `failedStages`, but that's no big deal.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]