Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21019#discussion_r181779186
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2146,6 +2146,57 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
         assertDataStructuresEmpty()
       }
     
    +  test("Trigger mapstage's job listener in submitMissingTasks") {
    +    val rdd1 = new MyRDD(sc, 2, Nil)
    +    val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2))
    +    val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker)
    +    val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2))
    +
    +    val listener1 = new SimpleListener
    +    val listener2 = new SimpleListener
    +
    +    submitMapStage(dep1, listener1)
    +    submitMapStage(dep2, listener2)
    +
    +    // Complete the stage0.
    +    assert(taskSets(0).stageId === 0)
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", rdd1.partitions.length)),
    +      (Success, makeMapStatus("hostB", rdd1.partitions.length))))
    +    assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 
0).map(_._1).toSet ===
    +        HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
    +    assert(listener1.results.size === 1)
    +
    +    // When attempting stage1, trigger a fetch failure.
    +    assert(taskSets(1).stageId === 1)
    +    complete(taskSets(1), Seq(
    +      (Success, makeMapStatus("hostC", rdd2.partitions.length)),
    +      (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, 
"ignored"), null)))
    +    scheduler.resubmitFailedStages()
    +    // Stage1 listener should not have a result yet
    +    assert(listener2.results.size === 0)
    +
    +    // Speculative task succeeded in stage1.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(1),
    +      Success,
    +      makeMapStatus("hostD", rdd2.partitions.length)))
    +    // stage1 listener still should not have a result, though there's no 
missing partitions
    +    // in it. Because stage1 is not inside runningStages at this moment.
    --- End diff --
    
    nit: ```Because stage1 has been failed and is not inside `runningStages` at 
this moment.```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to