Github user squito commented on the issue:

    https://github.com/apache/spark/pull/17297
  
    I'm a bit confused by the description:
    
    > 1. When a fetch failure happens, the task set manager ask the dag 
scheduler to abort all the non-running tasks. However, the running tasks in the 
task set are not killed.
    
    this is already true.  when there is a fetch failure, the [TaskSetManager 
is marked as 
zombie](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala?squery=TaskSetManager#L755),
 and the DAGScheduler resubmits stages, but nothing actively kills running 
tasks.
    
    >  re-launches all tasks in the stage with the fetch failure that hadn't 
completed when the fetch failure occurred (the DAGScheduler re-lanches all of 
the tasks whose output data is not available -- which is equivalent to the set 
of tasks that hadn't yet completed).
    
    I don't think its true that it relaunches all tasks that hadn't completed 
_when the fetch failure occurred_.  it relaunches all the tasks haven't 
completed, by the time the stage gets resubmitted.  More tasks can complete in 
between the time of the first failure, and the time the stage is resubmitted.
    
    But there are several other potential issues you may be trying to address.
    
    Say there is stage 0 and stage 1, each one has 10 tasks.  Stage 0 completes 
fine on the first attempt, then stage 1 starts.  Tasks 0 & 1 in stage 1 
complete, but then there is a fetch failure in task 2.  Lets also say we have 
an abundance of cluster resources so tasks 3 - 9 from stage 1, attempt 0 are 
still running.
    
    Stage 0 get resubmitted as attempt 1, just to regenerate the map output for 
whatever executor had the data for the fetch failure -- perhaps its just one 
task from stage 0 that needs to resubmitted.  Now, lots of different scenarios 
are possible:
    
    (a) Tasks 3 - 9 from stage 1 attempt 0 all finish successfully while stage 
0 attempt 1 is running.  So when stage 0 attempt 1 finishes, then stage 1 
attempt 1 is submitted, just with Task 2.  If it completely succesfully, we're 
done (no wasted work).
    
    (b) stage 0 attempt 1 finishes, before tasks 3 - 9 from stage 1 attempt 0 
have finished.  So stage 1 gets submitted again as stage 1 attempt 1, with 
tasks 2 - 9.  So there are now two copies running for tasks 3 - 9. Maybe all 
the tasks from attempt 0 actually finish shortly after attempt 1 starts.  In 
this case, the stage is complete as soon as there is one complete attempt for 
each task.  But even after the stage completes successfully, all the other 
tasks keep running anyway.  (plenty of wasted work)
    
    (c) like (b), but shortly after stage 1 attempt 1 is submitted, we get 
another fetch failure in one of the old "zombie" tasks from stage 1 attempt 0.  
But the [DAGScheduler realizes it already has a more recent attempt for this 
stage](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala?squery=DAgScheduler#L1268),
 so it ignores the fetch failure.  All the other tasks keep running as usual.  
If there aren't any other issues, the stage completes when there is one 
completed attempt for each task.  (same amount of wasted work as (b)).
    
    (d) While stage 0 attempt 1 is running, we get another fetch failure from 
stage 1 attempt 0, say in Task 3, which has a failure from a *different 
executor*.  Maybe its from a completely different host (just by chance, or 
there may be cluster maintenance where multiple hosts are serviced at once); or 
maybe its from another executor on the same host (at least, until we do 
something about your other pr on unregistering all shuffle files on a host).  
To be honest, I don't understand how things work in this scenario.  We [mark 
stage 0 as 
failed](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala?squery=DAgScheduler#L1303),
 we [unregister some shuffle 
output](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala?squery=DAgScheduler#L1328),
 and [we resubmit stage 
0](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/s
 park/scheduler/DAGScheduler.scala?squery=DAgScheduler#L1319).  But stage 0 
attempt 1 is still running, so I would have expected us to end up with 
conflicting task sets.  Whatever the real behavior is here, it seems we're at 
risk of having even more duplicated work for yet another attempt for stage 1.
    
    etc.
    
    So I think in (b) and (c), you are trying to avoid resubmitting tasks 3-9 
on stage 1 attempt 1.  the thing is, there is a strong reason to believe that 
the original version of those tasks will fail.  Most likely, those tasks needs 
map output from the same executor that caused the first fetch failure.  So Kay 
is suggesting that we take the opposite approach, and instead actively kill the 
tasks from stage 1 attempt 0.  OTOH, its possible that  (i) the issue may have 
been transient or (ii) the tasks already finished fetching that data before the 
error occurred.  We really have no idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to