Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16620#discussion_r97417399 --- Diff: core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala --- @@ -648,4 +648,69 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor } assertDataStructuresEmpty(noFailure = false) } + + testScheduler("[SPARK-19263] DAGScheduler shouldn't resubmit active taskSet.") { + val a = new MockRDD(sc, 2, Nil) + val b = shuffle(2, a) + val shuffleId = b.shuffleDeps.head.shuffleId + + def runBackend(): Unit = { + val (taskDescription, task) = backend.beginTask() + task.stageId match { + // ShuffleMapTask + case 0 => + val stageAttempt = task.stageAttemptId + val partitionId = task.partitionId + (stageAttempt, partitionId) match { + case (0, 0) => + val fetchFailed = FetchFailed( + DAGSchedulerSuite.makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored") + backend.taskFailed(taskDescription, fetchFailed) + case (0, 1) => + // Wait until stage resubmission caused by FetchFailed is finished. + waitUntilConditionBecomeTrue(taskScheduler.runningTaskSets.size==2, 5000, + "Wait until stage is resubmitted caused by fetch failed") + + // Task(stageAttempt=0, partition=1) will be bogus, because both two + // tasks(stageAttempt=0, partition=0, 1) run on hostA. + // Pending partitions are (0, 1) after stage resubmission, + // then change to be 0 after this bogus task. + backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostA", 2)) + case (1, 1) => + // Wait long enough until Success of task(stageAttempt=1 and partition=0) + // is handled by DAGScheduler. + Thread.sleep(5000) --- End diff -- hmm, this is a nuisance. I don't see any good way to get rid of this sleep ... but now that I think about it, why can't you do this in `DAGSchedulerSuite`? it seems like this can be entirely contained to the `DAGScheduler` and doesn't require tricky interactions with other parts of the scheduler. (I'm sorry I pointed you in the wrong direction earlier -- I thought perhaps you had tried to copy the examples of `DAGSchedlerSuite` but there was some reason you couldn't.)
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org