GitHub user pgandhi999 opened a pull request:

    https://github.com/apache/spark/pull/22806

    [SPARK-25250] : On successful completion of a task attempt on a parti…

    …tion id, kill other running task attempts on that same partition
    
    We recently had a scenario where a race condition occurred when a task from 
previous stage attempt just finished before new attempt for the same stage was 
created due to fetch failure, so the new task created in the second attempt on 
the same partition id was retrying multiple times due to TaskCommitDenied 
Exception without realizing that the task in earlier attempt was already 
successful.  
    
    For example, consider a task with partition id 9000 and index 9000 running 
in stage 4.0. We see a fetch failure so thus, we spawn a new stage attempt 4.1. 
Just within this timespan, the above task completes successfully, thus, marking 
the partition id 9000 as complete for 4.0. However, as stage 4.1 has not yet 
been created, the taskset info for that stage is not available to the 
TaskScheduler so, naturally, the partition id 9000 has not been marked 
completed for 4.1. Stage 4.1 now spawns task with index 2000 on the same 
partition id 9000. This task fails due to CommitDeniedException and since, it 
does not see the corresponding partition id as been marked successful, it keeps 
retrying multiple times until the job finally succeeds. It doesn't cause any 
job failures because the DAG scheduler is tracking the partitions separate from 
the task set managers.
    
    ## What changes were proposed in this pull request?
    
    The fix that this PR addresses is as follows:
    Whenever any Result Task gets successfully completed, we simply mark the 
corresponding partition id as completed in all attempts for that particular 
stage. As a result, we do not see any Killed tasks due to TaskCommitDenied 
Exceptions showing up in the UI. Also, since, the method defined uses hash maps 
and arrays for efficient searching and processing, so as a result, it's time 
complexity is not related to the number of tasks, hence, it is also efficient.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pgandhi999/spark SPARK-25250

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22806.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22806
    
----
commit 5ad6efde552fc541330386f5124c6b4055f82256
Author: pgandhi <pgandhi@...>
Date:   2018-10-23T14:31:27Z

    [SPARK-25250] : On successful completion of a task attempt on a partition 
id, kill other running task attempts on that same partition
    
    The fix that this PR addresses is as follows:
    Whenever any Result Task gets successfully completed, we simply mark the 
corresponding partition id as completed in all attempts for that particular 
stage. As a result, we do not see any Killed tasks due to TaskCommitDenied 
Exceptions showing up in the UI. Also, since, the method defined uses hash maps 
and arrays for efficient searching and processing, so as a result, it's time 
complexity is not related to the number of tasks, hence, it is also efficient.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to