GitHub user pgandhi999 opened a pull request: https://github.com/apache/spark/pull/22806
[SPARK-25250] : On successful completion of a task attempt on a parti⦠â¦tion id, kill other running task attempts on that same partition We recently had a scenario where a race condition occurred when a task from previous stage attempt just finished before new attempt for the same stage was created due to fetch failure, so the new task created in the second attempt on the same partition id was retrying multiple times due to TaskCommitDenied Exception without realizing that the task in earlier attempt was already successful. For example, consider a task with partition id 9000 and index 9000 running in stage 4.0. We see a fetch failure so thus, we spawn a new stage attempt 4.1. Just within this timespan, the above task completes successfully, thus, marking the partition id 9000 as complete for 4.0. However, as stage 4.1 has not yet been created, the taskset info for that stage is not available to the TaskScheduler so, naturally, the partition id 9000 has not been marked completed for 4.1. Stage 4.1 now spawns task with index 2000 on the same partition id 9000. This task fails due to CommitDeniedException and since, it does not see the corresponding partition id as been marked successful, it keeps retrying multiple times until the job finally succeeds. It doesn't cause any job failures because the DAG scheduler is tracking the partitions separate from the task set managers. ## What changes were proposed in this pull request? The fix that this PR addresses is as follows: Whenever any Result Task gets successfully completed, we simply mark the corresponding partition id as completed in all attempts for that particular stage. As a result, we do not see any Killed tasks due to TaskCommitDenied Exceptions showing up in the UI. Also, since, the method defined uses hash maps and arrays for efficient searching and processing, so as a result, it's time complexity is not related to the number of tasks, hence, it is also efficient. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pgandhi999/spark SPARK-25250 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22806.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22806 ---- commit 5ad6efde552fc541330386f5124c6b4055f82256 Author: pgandhi <pgandhi@...> Date: 2018-10-23T14:31:27Z [SPARK-25250] : On successful completion of a task attempt on a partition id, kill other running task attempts on that same partition The fix that this PR addresses is as follows: Whenever any Result Task gets successfully completed, we simply mark the corresponding partition id as completed in all attempts for that particular stage. As a result, we do not see any Killed tasks due to TaskCommitDenied Exceptions showing up in the UI. Also, since, the method defined uses hash maps and arrays for efficient searching and processing, so as a result, it's time complexity is not related to the number of tasks, hence, it is also efficient. ---- --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org