GitHub user jinxing64 opened a pull request:

    https://github.com/apache/spark/pull/16620

    [SPARK-19263] DAGScheduler should handle stage's pendingPartitions properly 
in handleTaskCompletion.

    ## What changes were proposed in this pull request?
    In current `DAGScheduler handleTaskCompletion` code, when event.reason is 
`Success`, it will first do `stage.pendingPartitions -= task.partitionId`, 
which maybe a bug when `FetchFailed` happens. Think about below:
    1. There are 2 executors A and B, executorA got assigned with 
ShuffleMapTask1 and ShuffleMapTask2;
    2. ShuffleMapTask1 want's to fetch blocks from local but failed;
    3. Driver receives the `FetchFailed` caused by ShuffleMapTask1 on executorA 
and marks executorA as lost and updates `failedEpoch`;
    4. Driver resubmits stages, containing ShuffleMapTask1x and 
ShuffleMapTask2x;
    5. ShuffleMapTask2 is successfully finished on executorA and sends 
`Success` back to driver;
    6. Driver receives `Success` and do `stage.pendingPartitions -= 
task.partitionId`, but then driver finds task's epoch is not big enough `<= 
failedEpoch(execId)` and just takes it as bogus, does not add the `MapStatus` 
to stage;
    7. ShuffleMapTask1x is successfully finished on executorB;
    8. Driver receives `Success` from ShuffleMapTask1x on executorB and does 
`stage.pendingPartitions -= task.partitionId`, thus no pending partitions, but 
then finds not all partitions are available because of step 6;
    9. Driver resubmits stage; but at this moment ShuffleMapTask2x is still 
running; in `TaskSchedulerImpl` submitTasks, it finds `conflictingTaskSet`, 
then throw `IllegalStateException`
    10. Failed.
    To reproduce the bug:
    1. We need to do some modification in `ShuffleBlockFetcherIterator`: we 
check whether the task's index in `TaskSetManager` and stage attempt equal to 0 
at the same time, if so, throw `FetchFailedException`;
    2. Rebuild spark then submit following job:
    ```
        val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), 
(0, 3), (2, 1), (3, 1)), 2)
        rdd.reduceByKey {
          (v1, v2) => {
            Thread.sleep(10000)
            v1 + v2
          }
        }.map {
          keyAndValue => {
            (keyAndValue._1 % 2, keyAndValue._2)
          }
        }.reduceByKey {
          (v1, v2) => {
            Thread.sleep(10000)
            v1 + v2
    
          }
        }.collect
    ```
    ## How was this patch tested?
    This patch is manually tested, after this patch, the bug cannot be 
reproduced in above situation;
    It is hard to write a unit test. Because we have to mock the behavior in 
both `DAGScheduler` and `TaskScheduler`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jinxing64/spark SPARK-19263

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16620.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16620
    
----
commit 9e4aab2addf2c8ed5e208938532f2fcbaf3547c0
Author: jinxing <jinx...@meituan.com>
Date:   2017-01-17T16:49:02Z

    [SPARK-19263] DAGScheduler should handle stage's pendingPartitions properly 
in handleTaskCompletion.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to