GitHub user cloud-fan opened a pull request:

    https://github.com/apache/spark/pull/22354

    [SPARK-23243][CORE][2.3] Fix RDD.repartition() data correctness issue

    backport https://github.com/apache/spark/pull/22112 to 2.3
    
    -------
    
    An alternative fix for https://github.com/apache/spark/pull/21698
    
    When Spark rerun tasks for an RDD, there are 3 different behaviors:
    1. determinate. Always return the same result with same order when rerun.
    2. unordered. Returns same data set in random order when rerun.
    3. indeterminate. Returns different result when rerun.
    
    Normally Spark doesn't need to care about it. Spark runs stages one by one, 
when a task is failed, just rerun it. Although the rerun task may return a 
different result, users will not be surprised.
    
    However, Spark may rerun a finished stage when seeing fetch failures. When 
this happens, Spark needs to rerun all the tasks of all the succeeding stages 
if the RDD output is indeterminate, because the input of the succeeding stages 
has been changed.
    
    If the RDD output is determinate, we only need to rerun the failed tasks of 
the succeeding stages, because the input doesn't change.
    
    If the RDD output is unordered, it's same as determinate, because shuffle 
partitioner is always deterministic(round-robin partitioner is not a shuffle 
partitioner that extends `org.apache.spark.Partitioner`), so the reducers will 
still get the same input data set.
    
    This PR fixed the failure handling for `repartition`, to avoid correctness 
issues.
    
    For `repartition`, it applies a stateful map function to generate a 
round-robin id, which is order sensitive and makes the RDD's output 
indeterminate. When the stage contains `repartition` reruns, we must also rerun 
all the tasks of all the succeeding stages.
    
    **future improvement:**
    1. Currently we can't rollback and rerun a shuffle map stage, and just 
fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
    2. Currently we can't rollback and rerun a result stage, and just fail. We 
should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
    3. We should provide public API to allow users to tag the random level of 
the RDD's computing function.
    
    a new test case


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cloud-fan/spark repartition

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22354.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22354
    
----
commit 1e01b008d5c0fa449f2b35edf36693d828bfae15
Author: Wenchen Fan <wenchen@...>
Date:   2018-09-05T22:36:34Z

    [SPARK-23243][CORE] Fix RDD.repartition() data correctness issue
    
    An alternative fix for https://github.com/apache/spark/pull/21698
    
    When Spark rerun tasks for an RDD, there are 3 different behaviors:
    1. determinate. Always return the same result with same order when rerun.
    2. unordered. Returns same data set in random order when rerun.
    3. indeterminate. Returns different result when rerun.
    
    Normally Spark doesn't need to care about it. Spark runs stages one by one, 
when a task is failed, just rerun it. Although the rerun task may return a 
different result, users will not be surprised.
    
    However, Spark may rerun a finished stage when seeing fetch failures. When 
this happens, Spark needs to rerun all the tasks of all the succeeding stages 
if the RDD output is indeterminate, because the input of the succeeding stages 
has been changed.
    
    If the RDD output is determinate, we only need to rerun the failed tasks of 
the succeeding stages, because the input doesn't change.
    
    If the RDD output is unordered, it's same as determinate, because shuffle 
partitioner is always deterministic(round-robin partitioner is not a shuffle 
partitioner that extends `org.apache.spark.Partitioner`), so the reducers will 
still get the same input data set.
    
    This PR fixed the failure handling for `repartition`, to avoid correctness 
issues.
    
    For `repartition`, it applies a stateful map function to generate a 
round-robin id, which is order sensitive and makes the RDD's output 
indeterminate. When the stage contains `repartition` reruns, we must also rerun 
all the tasks of all the succeeding stages.
    
    **future improvement:**
    1. Currently we can't rollback and rerun a shuffle map stage, and just 
fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
    2. Currently we can't rollback and rerun a result stage, and just fail. We 
should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
    3. We should provide public API to allow users to tag the random level of 
the RDD's computing function.
    
    a new test case
    
    Closes #22112 from cloud-fan/repartition.
    
    Lead-authored-by: Wenchen Fan <[email protected]>
    Co-authored-by: Xingbo Jiang <[email protected]>
    Signed-off-by: Xiao Li <[email protected]>

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to