Github user jiangxb1987 commented on a diff in the pull request:
https://github.com/apache/spark/pull/21019#discussion_r181779186
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2146,6 +2146,57 @@ class DAGSchedulerSuite extends SparkFunSuite with
LocalSparkContext with TimeLi
assertDataStructuresEmpty()
}
+ test("Trigger mapstage's job listener in submitMissingTasks") {
+ val rdd1 = new MyRDD(sc, 2, Nil)
+ val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2))
+ val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker)
+ val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2))
+
+ val listener1 = new SimpleListener
+ val listener2 = new SimpleListener
+
+ submitMapStage(dep1, listener1)
+ submitMapStage(dep2, listener2)
+
+ // Complete the stage0.
+ assert(taskSets(0).stageId === 0)
+ complete(taskSets(0), Seq(
+ (Success, makeMapStatus("hostA", rdd1.partitions.length)),
+ (Success, makeMapStatus("hostB", rdd1.partitions.length))))
+ assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId,
0).map(_._1).toSet ===
+ HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
+ assert(listener1.results.size === 1)
+
+ // When attempting stage1, trigger a fetch failure.
+ assert(taskSets(1).stageId === 1)
+ complete(taskSets(1), Seq(
+ (Success, makeMapStatus("hostC", rdd2.partitions.length)),
+ (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0,
"ignored"), null)))
+ scheduler.resubmitFailedStages()
+ // Stage1 listener should not have a result yet
+ assert(listener2.results.size === 0)
+
+ // Speculative task succeeded in stage1.
+ runEvent(makeCompletionEvent(
+ taskSets(1).tasks(1),
+ Success,
+ makeMapStatus("hostD", rdd2.partitions.length)))
+ // stage1 listener still should not have a result, though there's no
missing partitions
+ // in it. Because stage1 is not inside runningStages at this moment.
--- End diff --
nit: ```Because stage1 has been failed and is not inside `runningStages` at
this moment.```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]