[ 
https://issues.apache.org/jira/browse/SPARK-5259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15735160#comment-15735160
 ] 

xukun commented on SPARK-5259:
------------------------------

[~squito] [~SuYan] Would it be possible to backport this to branch 1.5?

> Do not submit stage until its dependencies map outputs are registered
> ---------------------------------------------------------------------
>
>                 Key: SPARK-5259
>                 URL: https://issues.apache.org/jira/browse/SPARK-5259
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.1.1, 1.2.0
>            Reporter: SuYan
>            Assignee: SuYan
>            Priority: Critical
>             Fix For: 1.6.0
>
>
> We should track pending tasks by partition ID instead of Task objects.
> Before this, failure & retry could result in a case where a stage got 
> submitted before the map output from its dependencies get registered. This 
> was due to an error in the condition for registering map outputs.
> More complete explanation of the original problem:
> 1. while shuffle stage was retry, there may have 2 taskSet running. 
> we call the 2 taskSet:taskSet0.0, taskSet0.1, and we know, taskSet0.1 will 
> re-run taskSet0.0's un-complete task
> if taskSet0.0 was run all the task that the taskSet0.1 not complete yet but 
> covered the partitions.
> then stage is Available is true.
> {code}
>   def isAvailable: Boolean = {
>     if (!isShuffleMap) {
>       true
>     } else {
>       numAvailableOutputs == numPartitions
>     }
>   } 
> {code}
> but stage.pending task is not empty, to protect register mapStatus in 
> mapOutputTracker.
> because if task is complete success, pendingTasks is minus Task in 
> reference-level because the task is not override hashcode() and equals()
> pendingTask -= task
> but numAvailableOutputs is according to partitionID.
> here is the testcase to prove:
> {code}
>   test("Make sure mapStage.pendingtasks is set() " +
>     "while MapStage.isAvailable is true while stage was retry ") {
>     val firstRDD = new MyRDD(sc, 6, Nil)
>     val firstShuffleDep = new ShuffleDependency(firstRDD, null)
>     val firstShuyffleId = firstShuffleDep.shuffleId
>     val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep))
>     val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
>     val shuffleId = shuffleDep.shuffleId
>     val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
>     submit(reduceRdd, Array(0, 1))
>     complete(taskSets(0), Seq(
>       (Success, makeMapStatus("hostB", 1)),
>       (Success, makeMapStatus("hostB", 2)),
>       (Success, makeMapStatus("hostC", 3)),
>       (Success, makeMapStatus("hostB", 4)),
>       (Success, makeMapStatus("hostB", 5)),
>       (Success, makeMapStatus("hostC", 6))
>     ))
>     complete(taskSets(1), Seq(
>       (Success, makeMapStatus("hostA", 1)),
>       (Success, makeMapStatus("hostB", 2)),
>       (Success, makeMapStatus("hostA", 1)),
>       (Success, makeMapStatus("hostB", 2)),
>       (Success, makeMapStatus("hostA", 1))
>     ))
>     runEvent(ExecutorLost("exec-hostA"))
>     runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, null, null, 
> null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, null, null, 
> null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(0),
>       FetchFailed(null, firstShuyffleId, -1, 0, "Fetch Mata data failed"),
>       null, null, null, null))
>     scheduler.resubmitFailedStages()
>     runEvent(CompletionEvent(taskSets(1).tasks(0), Success,
>       makeMapStatus("hostC", 1), null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(2), Success,
>       makeMapStatus("hostC", 1), null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(4), Success,
>       makeMapStatus("hostC", 1), null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(5), Success,
>       makeMapStatus("hostB", 2), null, null, null))
>     val stage = scheduler.stageIdToStage(taskSets(1).stageId)
>     assert(stage.attemptId == 2)
>     assert(stage.isAvailable)
>     assert(stage.pendingTasks.size == 0)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to