Github user kayousterhout commented on the issue:
https://github.com/apache/spark/pull/16620
tl;dr I donât think Markâs change is quite correct, which is why the
tests were failing. Instead, I think we need to replace the failedEpoch
if/else statement and the pendingPartitions update in
DAGScheduler.handleTaskCompletion with:
`if (stageIdToStage(task.stageId).latestInfo.attemptId ==
task.stageAttemptId) {â¨
// This task was for the currently running attempt of the stage. Since
the task
⨠// completed successfully from the perspective of the TaskSetManager,
mark it as
⨠// no longer pending (the TaskSetManager may consider the task
complete even
// when the output needs to be ignored because the task's epoch is too
small below).
⨠shuffleStage.pendingPartitions -= task.partitionId
â¨}
â¨â¨if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId))
{
⨠logInfo(s"Ignoring possibly bogus $smt completion from executor
$execId")â¨
} else {
⨠// The epoch of the task is acceptable (i.e., the task was launched
after the mostâ¨
// recent failure we're aware of for the executor), so mark the task's
output as
⨠// available.
⨠shuffleStage.addOutputLoc(smt.partitionId, status)â¨
// Remove the task's partition from pending partitions. This may have
already been
⨠// done above, but will not have been done yet in cases where the task
attempt wasâ¨
// from an earlier attempt of the stage (i.e., not the attempt that's
currently
⨠// running). This allows the DAGScheduler to mark the stage as
complete when oneâ¨
// copy of each task has finished successfully, even if the currently
active stage
⨠// still has tasks running.â¨
shuffleStage.pendingPartitions -= task.partitionIdâ¨}
`
I submitted #16892 to attempt to clarify the test case where Markâs
change originally failed (this PR shouldn't block on that -- that's just to
clarify things for ourselves in the future), and also wrote a very long write
up of whatâs going on below.
âââââ
There are three relevant pieces of state to consider here:
(1) The tasks that the TaskSetManager (TSM) considers currently pending.
The TSM encodes these pending tasks in its âsuccessfulâ array. When a task
set is launched, all of its tasks are considered pending, and all of the
entries in the successful array are False. Tasks are no longer considered
pending (and are marked as True in the âsuccessfulâ array) if either (a) a
copy of the task finishes successfully or (b) a copy of the task fails with a
fetch failed (in which case the TSM assumes that the task will never complete
successfully, because the previous stage needs to be re-run). Additionally, a
task that previously completed successfully can be re-marked as pending if the
stage is a shuffle map stage, and the executor where the task ran died (this is
because the map output needs to be re-generated, and the TSM will re-schedule
the task).
The TSM notifies the DAGScheduler that the stage has completed if either
(a) the stage fails (e.g., thereâs a fetch failure) or (b) all of the entries
in âsuccessfulâ are true (i.e., there are no more pending tasks).
(2) ShuffleMapStage.pendingPartitions. This variable is used by the
DAGScheduler to track the pending tasks for a stage, and mostly is consistent
with the TSMâs pending tasks (described above). When a stage begins, the
DAGScheduler marks all of the partitions that need to be computed as pending,
and then removes them from pendingPartitions as the TSM notifies the
DAGScheduler that tasks have successfully completed. When a TSM determines
that a task needs to be re-run (because itâs a shuffle map task that ran on a
now-dead executor), the TSM sends a Resubmitted task completion event to the
DAGScheduler, which causes the DAGScheduler to re-add the task to
pendingPartitions (in doing so, the DAGScheduler is keeping pendingPartitions
consistent with the TSMâs pending tasks).
I believe there are two scenarios (currently) where
ShuffleMapStage.pendingPartitions and the TSMâs pending tasks become
inconsistent:
-Scenario A (performance optimization, as discussed here already): This
happens if a ShuffleMapStage gets re-run (e.g., because the first time it ran,
it encountered a fetch failure, so the previous stage needed to be re-run to
generate the missing output). Call the original attempt #0 and the currently
running attempt #1. If thereâs a task from attempt #0 thatâs still
running, and it is running on an executor that *was not* marked as failed (this
is the condition captured by the failedEpoch if-statement), and it completes
successfully, this event will be handled by the TSM for attempt #0. When the
DAGScheduler hears that the task completed successfully, it will remove it from
pendingPartitions (even though thereâs still a running copy of this task in
the TSM for attempt #1, which is the currently active attempt). This allows
the DAGScheduler to mark the stage has finished earlier than when the TSM
thinks that the stage is finished.
-Scenario B (bug, as discussed): This happens in the same case as scenario
one, except that itâs when a task from attempt #0 completes successfully, but
itâs on an executor that *was* marked as failed (again, this is the
failedEpoch if-statement). In this case, the DAGScheduler considers the output
âbogusâ (because the executor has since been lost, so the output is
probably gone), but the DAGScheduler still removes the task from
pendingPartitions. This can cause the DAGScheduler to determine that the stage
is complete (the shuffleStage.pendingPartitions.isEmpty) if-statement, even
though thereâs still another running copy of that task (in the TSM for
attempt #1) that could complete successfully. The DAGScheduler will notice an
output is missing (âif !shuffleStage.isAvailable)â and re-submit the stage,
leading to an exception being thrown, because thereâs still an active
TaskSetManager. This is the root cause of the bug here, and is fixed by the
proposed code a
bove.
(3) ShuffleMapStage.outputLocs This tracks the output locations for all of
the tasks in a stage. If a stage gets re-run, only the tasks that need to be
re-run will be in the two variables above, but all of the tasks in the stage
(including ones that have finished) will always be in outputLocs. One use of
this variable thatâs different than the others is that outputLocs can be used
after a stage completes and when no tasks are actively running. For example,
if a task in the next stage fails with a fetch failure, the output location for
the data that triggered the failure will be removed from
ShuffleMapStage.outputLocs. outputLocs also may track multiple locations for a
particular task (e.g., if two copies of the task completed successfully).
As far as I understand, this will be inconsistent with pendingPartitions in
two cases:
- (A) Consider a task X that finishes successfully on an executor E. Now
suppose executor E gets marked as lost (e.g., because of another task that
failed to fetch data from E), causing the DAGScheduler to update the epoch on
E, and the TaskSetManager to mark X as Resubmitted. Sometime after executor E
is marked as lost, the TaskSetManager processes the task successful message for
X. The TaskSetManager still considers X to be done and marks it as successful,
and the DAGScheduler removes the task from ShuffleMapStage.pendingTasks.
However, because the epoch on the machine is too old (in other words, because
the DAGScheduler knows that executor E failed sometime after task X started),
the DAGScheduler wonât register the output location for the task. This
particular functionality is necessary for correctness, and will trigger the
âif !shuffleStage.isAvailableâ statement. Task X needs to be re-run, and
the TSM doesnât âknowâ that X needs to be re-run (it thinks X co
mpleted successfully). If the DAGScheduler didnât remove X from
pendingPartitions, things would end up hanging, as @jinxing64 pointed out.
This is the test case I improved in #16892.
- (B) In buggy scenario B above (which should get fixed by this PR).
There are some more fixes we could do to clean some of this up and make it
easier to reason about â but in the immediate future, I think the fix at the
top is the best way to fix the current bug.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]