GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/16620
[SPARK-19263] DAGScheduler should handle stage's pendingPartitions properly in handleTaskCompletion. ## What changes were proposed in this pull request? In current `DAGScheduler handleTaskCompletion` code, when event.reason is `Success`, it will first do `stage.pendingPartitions -= task.partitionId`, which maybe a bug when `FetchFailed` happens. Think about below: 1. There are 2 executors A and B, executorA got assigned with ShuffleMapTask1 and ShuffleMapTask2; 2. ShuffleMapTask1 want's to fetch blocks from local but failed; 3. Driver receives the `FetchFailed` caused by ShuffleMapTask1 on executorA and marks executorA as lost and updates `failedEpoch`; 4. Driver resubmits stages, containing ShuffleMapTask1x and ShuffleMapTask2x; 5. ShuffleMapTask2 is successfully finished on executorA and sends `Success` back to driver; 6. Driver receives `Success` and do `stage.pendingPartitions -= task.partitionId`, but then driver finds task's epoch is not big enough `<= failedEpoch(execId)` and just takes it as bogus, does not add the `MapStatus` to stage; 7. ShuffleMapTask1x is successfully finished on executorB; 8. Driver receives `Success` from ShuffleMapTask1x on executorB and does `stage.pendingPartitions -= task.partitionId`, thus no pending partitions, but then finds not all partitions are available because of step 6; 9. Driver resubmits stage; but at this moment ShuffleMapTask2x is still running; in `TaskSchedulerImpl` submitTasks, it finds `conflictingTaskSet`, then throw `IllegalStateException` 10. Failed. To reproduce the bug: 1. We need to do some modification in `ShuffleBlockFetcherIterator`: we check whether the task's index in `TaskSetManager` and stage attempt equal to 0 at the same time, if so, throw `FetchFailedException`; 2. Rebuild spark then submit following job: ``` val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, 3), (2, 1), (3, 1)), 2) rdd.reduceByKey { (v1, v2) => { Thread.sleep(10000) v1 + v2 } }.map { keyAndValue => { (keyAndValue._1 % 2, keyAndValue._2) } }.reduceByKey { (v1, v2) => { Thread.sleep(10000) v1 + v2 } }.collect ``` ## How was this patch tested? This patch is manually tested, after this patch, the bug cannot be reproduced in above situation; It is hard to write a unit test. Because we have to mock the behavior in both `DAGScheduler` and `TaskScheduler`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19263 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16620.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16620 ---- commit 9e4aab2addf2c8ed5e208938532f2fcbaf3547c0 Author: jinxing <jinx...@meituan.com> Date: 2017-01-17T16:49:02Z [SPARK-19263] DAGScheduler should handle stage's pendingPartitions properly in handleTaskCompletion. ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org