GitHub user ivoson opened a pull request:

    https://github.com/apache/spark/pull/20635

    [SPARK-23053][CORE][BRANCH-2.1] taskBinarySerialization and task partitions 
calculate in DagScheduler.submitMissingTasks should keep the same RDD 
checkpoint status

    ## What changes were proposed in this pull request?
    This PR backports [#20244](https://github.com/apache/spark/pull/20244) 
    
    When we run concurrent jobs using the same rdd which is marked to do 
checkpoint. If one job has finished running the job, and start the process of 
RDD.doCheckpoint, while another job is submitted, then submitStage and 
submitMissingTasks will be called. In 
[submitMissingTasks](https://github.com/apache/spark/blob/branch-2.1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L932),
 will serialize taskBinaryBytes and calculate task partitions which are both 
affected by the status of checkpoint, if the former is calculated before 
doCheckpoint finished, while the latter is calculated after doCheckpoint 
finished, when run task, rdd.compute will be called, for some rdds with 
particular partition type such as 
[UnionRDD](https://github.com/apache/spark/blob/branch-2.1/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala)
 who will do partition type cast, will get a ClassCastException because the 
part params is actually a CheckpointRDDPartition.
    This error occurs  because rdd.doCheckpoint occurs in the same thread that 
called sc.runJob, while the task serialization occurs in the DAGSchedulers 
event loop.
    
    ## How was this patch tested?
    the exist tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ivoson/spark branch-2.1-23053

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20635.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20635
    
----
commit bd88903aca841e6ad55144127e4c11e9844ef6ce
Author: huangtengfei <huangtengfei@...>
Date:   2018-02-13T15:59:21Z

    [SPARK-23053][CORE] taskBinarySerialization and task partitions calculate 
in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status
    
    When we run concurrent jobs using the same rdd which is marked to do 
checkpoint. If one job has finished running the job, and start the process of 
RDD.doCheckpoint, while another job is submitted, then submitStage and 
submitMissingTasks will be called. In 
[submitMissingTasks](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961),
 will serialize taskBinaryBytes and calculate task partitions which are both 
affected by the status of checkpoint, if the former is calculated before 
doCheckpoint finished, while the latter is calculated after doCheckpoint 
finished, when run task, rdd.compute will be called, for some rdds with 
particular partition type such as 
[UnionRDD](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala)
 who will do partition type cast, will get a ClassCastException because the 
part params is actually a CheckpointRDDPartition.
    This error occurs  because rdd.doCheckpoint occurs in the same thread that 
called sc.runJob, while the task serialization occurs in the DAGSchedulers 
event loop.
    
    the exist uts and also add a test case in DAGScheduerSuite to show the 
exception case.
    
    Author: huangtengfei <[email protected]>
    
    Closes #20244 from ivoson/branch-taskpart-mistype.
    
    Change-Id: I634009d51ae40336e9d0717d061213ff7e36e71f

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to