[GitHub] spark pull request #21019: [SPARK-23948] Trigger mapstage's job listener in ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21019 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21019: [SPARK-23948] Trigger mapstage's job listener in ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21019#discussion_r181889300 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1433,6 +1426,16 @@ class DAGScheduler( } } + private[scheduler] def markMapStageJobsAsFinished(shuffleStage: ShuffleMapStage): Unit = { +// Mark any map-stage jobs waiting on this stage as finished +if (shuffleStage.isAvailable && shuffleStage.mapStageJobs.nonEmpty) { --- End diff -- doesn't seem this is necessary, as its already handled at the callsites ... but IMO its seems safer to include it, in case this gets invoked elsewhere in the future. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21019: [SPARK-23948] Trigger mapstage's job listener in ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21019#discussion_r181891158 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2146,6 +2146,57 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assertDataStructuresEmpty() } + test("Trigger mapstage's job listener in submitMissingTasks") { +val rdd1 = new MyRDD(sc, 2, Nil) +val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2)) +val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker) +val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2)) + +val listener1 = new SimpleListener +val listener2 = new SimpleListener + +submitMapStage(dep1, listener1) +submitMapStage(dep2, listener2) + +// Complete the stage0. +assert(taskSets(0).stageId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", rdd1.partitions.length)), + (Success, makeMapStatus("hostB", rdd1.partitions.length +assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === +HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) +assert(listener1.results.size === 1) + +// When attempting stage1, trigger a fetch failure. +assert(taskSets(1).stageId === 1) +complete(taskSets(1), Seq( + (Success, makeMapStatus("hostC", rdd2.partitions.length)), + (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null))) +scheduler.resubmitFailedStages() +// Stage1 listener should not have a result yet +assert(listener2.results.size === 0) + +// Speculative task succeeded in stage1. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + Success, + makeMapStatus("hostD", rdd2.partitions.length))) +// stage1 listener still should not have a result, though there's no missing partitions +// in it. Because stage1 is not inside runningStages at this moment. +assert(listener2.results.size === 0) + +// Stage0 should now be running as task set 2; make its task succeed +assert(taskSets(2).stageId === 0) +complete(taskSets(2), Seq( + (Success, makeMapStatus("hostC", rdd2.partitions.length +assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === +HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) --- End diff -- can just use `Set` here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21019: [SPARK-23948] Trigger mapstage's job listener in ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21019#discussion_r181892493 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2146,6 +2146,57 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assertDataStructuresEmpty() } + test("Trigger mapstage's job listener in submitMissingTasks") { +val rdd1 = new MyRDD(sc, 2, Nil) +val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2)) +val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker) +val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2)) + +val listener1 = new SimpleListener +val listener2 = new SimpleListener + +submitMapStage(dep1, listener1) +submitMapStage(dep2, listener2) + +// Complete the stage0. +assert(taskSets(0).stageId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", rdd1.partitions.length)), + (Success, makeMapStatus("hostB", rdd1.partitions.length +assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === +HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) +assert(listener1.results.size === 1) + +// When attempting stage1, trigger a fetch failure. +assert(taskSets(1).stageId === 1) +complete(taskSets(1), Seq( + (Success, makeMapStatus("hostC", rdd2.partitions.length)), + (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null))) +scheduler.resubmitFailedStages() +// Stage1 listener should not have a result yet +assert(listener2.results.size === 0) + +// Speculative task succeeded in stage1. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + Success, + makeMapStatus("hostD", rdd2.partitions.length))) +// stage1 listener still should not have a result, though there's no missing partitions +// in it. Because stage1 is not inside runningStages at this moment. +assert(listener2.results.size === 0) + +// Stage0 should now be running as task set 2; make its task succeed +assert(taskSets(2).stageId === 0) +complete(taskSets(2), Seq( + (Success, makeMapStatus("hostC", rdd2.partitions.length +assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === +HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) + +// After stage0 is finished, stage1 will be submitted and found there is no missing +// partitions in it. Then listener got triggered. +assert(listener2.results.size === 1) + } --- End diff -- can you also add `assertDataStructuresEmpty()` please? I know its not really related to your change but nice to include this in all the tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21019: [SPARK-23948] Trigger mapstage's job listener in ...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21019#discussion_r181778477 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1433,6 +1426,16 @@ class DAGScheduler( } } + private[scheduler] def markMapStageJobsAsFinished(shuffleStage: ShuffleMapStage): Unit = { +// Mark any map-stage jobs waiting on this stage as finished +if (shuffleStage.isAvailable && shuffleStage.mapStageJobs.nonEmpty) { --- End diff -- Why do we need to double check that `shuffleStage.isAvailable` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21019: [SPARK-23948] Trigger mapstage's job listener in ...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21019#discussion_r181779186 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2146,6 +2146,57 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assertDataStructuresEmpty() } + test("Trigger mapstage's job listener in submitMissingTasks") { +val rdd1 = new MyRDD(sc, 2, Nil) +val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2)) +val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker) +val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2)) + +val listener1 = new SimpleListener +val listener2 = new SimpleListener + +submitMapStage(dep1, listener1) +submitMapStage(dep2, listener2) + +// Complete the stage0. +assert(taskSets(0).stageId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", rdd1.partitions.length)), + (Success, makeMapStatus("hostB", rdd1.partitions.length +assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === +HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) +assert(listener1.results.size === 1) + +// When attempting stage1, trigger a fetch failure. +assert(taskSets(1).stageId === 1) +complete(taskSets(1), Seq( + (Success, makeMapStatus("hostC", rdd2.partitions.length)), + (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null))) +scheduler.resubmitFailedStages() +// Stage1 listener should not have a result yet +assert(listener2.results.size === 0) + +// Speculative task succeeded in stage1. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + Success, + makeMapStatus("hostD", rdd2.partitions.length))) +// stage1 listener still should not have a result, though there's no missing partitions +// in it. Because stage1 is not inside runningStages at this moment. --- End diff -- nit: ```Because stage1 has been failed and is not inside `runningStages` at this moment.``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21019: [SPARK-23948] Trigger mapstage's job listener in ...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/21019 [SPARK-23948] Trigger mapstage's job listener in submitMissingTasks ## What changes were proposed in this pull request? SparkContext submitted a map stage from `submitMapStage` to `DAGScheduler`, `markMapStageJobAsFinished` is called only in (); But think about below scenario: 1. stage0 and stage1 are all `ShuffleMapStage` and stage1 depends on stage0; 2. We submit stage1 by `submitMapStage`, there are 10 missing tasks in stage1 3. When stage 1 running, `FetchFailed` happened, stage0 and stage1 got resubmitted as stage0_1 and stage1_1; 4. When stage0_1 running, speculated tasks in old stage1 come as succeeded, but stage1 is not inside `runningStages`. So even though all splits(including the speculated tasks) in stage1 succeeded, job listener in stage1 will not be called; 5. stage0_1 finished, stage1_1 starts running. When `submitMissingTasks`, there is no missing tasks. But in current code, job listener is not triggered ## How was this patch tested? Not added yet. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-23948 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21019.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21019 commit 685124a11b789af2a42b4978e25ed404b2a15176 Author: jinxing Date: 2018-04-10T03:33:02Z [SPARK-23948] Trigger mapstage's job listener in submitMissingTasks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org