Github user squito commented on the issue:
https://github.com/apache/spark/pull/17297
I'm a bit confused by the description:
> 1. When a fetch failure happens, the task set manager ask the dag
scheduler to abort all the non-running tasks. However, the running tasks in the
task set are not killed.
this is already true. when there is a fetch failure, the [TaskSetManager
is marked as
zombie](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala?squery=TaskSetManager#L755),
and the DAGScheduler resubmits stages, but nothing actively kills running
tasks.
> re-launches all tasks in the stage with the fetch failure that hadn't
completed when the fetch failure occurred (the DAGScheduler re-lanches all of
the tasks whose output data is not available -- which is equivalent to the set
of tasks that hadn't yet completed).
I don't think its true that it relaunches all tasks that hadn't completed
_when the fetch failure occurred_. it relaunches all the tasks haven't
completed, by the time the stage gets resubmitted. More tasks can complete in
between the time of the first failure, and the time the stage is resubmitted.
But there are several other potential issues you may be trying to address.
Say there is stage 0 and stage 1, each one has 10 tasks. Stage 0 completes
fine on the first attempt, then stage 1 starts. Tasks 0 & 1 in stage 1
complete, but then there is a fetch failure in task 2. Lets also say we have
an abundance of cluster resources so tasks 3 - 9 from stage 1, attempt 0 are
still running.
Stage 0 get resubmitted as attempt 1, just to regenerate the map output for
whatever executor had the data for the fetch failure -- perhaps its just one
task from stage 0 that needs to resubmitted. Now, lots of different scenarios
are possible:
(a) Tasks 3 - 9 from stage 1 attempt 0 all finish successfully while stage
0 attempt 1 is running. So when stage 0 attempt 1 finishes, then stage 1
attempt 1 is submitted, just with Task 2. If it completely succesfully, we're
done (no wasted work).
(b) stage 0 attempt 1 finishes, before tasks 3 - 9 from stage 1 attempt 0
have finished. So stage 1 gets submitted again as stage 1 attempt 1, with
tasks 2 - 9. So there are now two copies running for tasks 3 - 9. Maybe all
the tasks from attempt 0 actually finish shortly after attempt 1 starts. In
this case, the stage is complete as soon as there is one complete attempt for
each task. But even after the stage completes successfully, all the other
tasks keep running anyway. (plenty of wasted work)
(c) like (b), but shortly after stage 1 attempt 1 is submitted, we get
another fetch failure in one of the old "zombie" tasks from stage 1 attempt 0.
But the [DAGScheduler realizes it already has a more recent attempt for this
stage](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala?squery=DAgScheduler#L1268),
so it ignores the fetch failure. All the other tasks keep running as usual.
If there aren't any other issues, the stage completes when there is one
completed attempt for each task. (same amount of wasted work as (b)).
(d) While stage 0 attempt 1 is running, we get another fetch failure from
stage 1 attempt 0, say in Task 3, which has a failure from a *different
executor*. Maybe its from a completely different host (just by chance, or
there may be cluster maintenance where multiple hosts are serviced at once); or
maybe its from another executor on the same host (at least, until we do
something about your other pr on unregistering all shuffle files on a host).
To be honest, I don't understand how things work in this scenario. We [mark
stage 0 as
failed](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala?squery=DAgScheduler#L1303),
we [unregister some shuffle
output](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala?squery=DAgScheduler#L1328),
and [we resubmit stage
0](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/s
park/scheduler/DAGScheduler.scala?squery=DAgScheduler#L1319). But stage 0
attempt 1 is still running, so I would have expected us to end up with
conflicting task sets. Whatever the real behavior is here, it seems we're at
risk of having even more duplicated work for yet another attempt for stage 1.
etc.
So I think in (b) and (c), you are trying to avoid resubmitting tasks 3-9
on stage 1 attempt 1. the thing is, there is a strong reason to believe that
the original version of those tasks will fail. Most likely, those tasks needs
map output from the same executor that caused the first fetch failure. So Kay
is suggesting that we take the opposite approach, and instead actively kill the
tasks from stage 1 attempt 0. OTOH, its possible that (i) the issue may have
been transient or (ii) the tasks already finished fetching that data before the
error occurred. We really have no idea.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]