Stove-hust commented on PR #40393: URL: https://github.com/apache/spark/pull/40393#issuecomment-1474918516
> Instead of only testing specifically for the flag - which is subject to change as the implementation evolves, we should also test for behavior here. > > This is the reproducible test I was using (with some changes) to test approaches for this bug - and it mimics the case I saw in our production reasonably well. (In DAGSchedulerSuite): > > ``` > for (pushBasedShuffleEnabled <- Seq(true, false)) { > test("SPARK-40082: recomputation of shuffle map stage with no pending partitions should not " + > s"hang. pushBasedShuffleEnabled = $pushBasedShuffleEnabled") { > > if (pushBasedShuffleEnabled) { > initPushBasedShuffleConfs(conf) > DAGSchedulerSuite.clearMergerLocs() > DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) > } > > var taskIdCount = 0 > > var completedStage: List[Int] = Nil > val listener = new SparkListener() { > override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { > completedStage = completedStage :+ event.stageInfo.stageId > } > } > sc.addSparkListener(listener) > > val fetchFailParentPartition = 0 > > val shuffleMapRdd0 = new MyRDD(sc, 2, Nil) > val shuffleDep0 = new ShuffleDependency(shuffleMapRdd0, new HashPartitioner(2)) > > val shuffleMapRdd1 = new MyRDD(sc, 2, List(shuffleDep0), tracker = mapOutputTracker) > val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2)) > > val reduceRdd = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker) > > // submit the initial mapper stage, generate shuffle output for first reducer stage. > submitMapStage(shuffleDep0) > > // Map stage completes successfully, > completeShuffleMapStageSuccessfully(0, 0, 3, Seq("hostA", "hostB")) > taskIdCount += 2 > assert(completedStage === List(0)) > > // Now submit the first reducer stage > submitMapStage(shuffleDep1) > > def createTaskInfo(speculative: Boolean): TaskInfo = { > val taskInfo = new TaskInfo( > taskId = taskIdCount, > index = 0, > attemptNumber = 0, > partitionId = 0, > launchTime = 0L, > executorId = "", > host = "hostC", > TaskLocality.ANY, > speculative = speculative) > taskIdCount += 1 > taskInfo > } > > val normalTask = createTaskInfo(speculative = false); > val speculativeTask = createTaskInfo(speculative = true) > > // fail task 1.0 due to FetchFailed, and make 1.1 succeed. > runEvent(makeCompletionEvent(taskSets(1).tasks(0), > FetchFailed(makeBlockManagerId("hostA"), shuffleDep0.shuffleId, normalTask.taskId, > fetchFailParentPartition, normalTask.index, "ignored"), > result = null, > Seq.empty, > Array.empty, > normalTask)) > > // Make the speculative task succeed after initial task has failed > runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success, > result = MapStatus(BlockManagerId("hostD-exec1", "hostD", 34512), > Array.fill[Long](2)(2), mapTaskId = speculativeTask.taskId), > taskInfo = speculativeTask)) > > // The second task, for partition 1 succeeds as well. > runEvent(makeCompletionEvent(taskSets(1).tasks(1), Success, > result = MapStatus(BlockManagerId("hostE-exec2", "hostE", 23456), > Array.fill[Long](2)(2), mapTaskId = taskIdCount), > )) > taskIdCount += 1 > > sc.listenerBus.waitUntilEmpty() > assert(completedStage === List(0, 2)) > > // the stages will now get resubmitted due to the failure > Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) > > // parent map stage resubmitted > assert(scheduler.runningStages.size === 1) > val mapStage = scheduler.runningStages.head > > // Stage 1 is same as Stage 0 - but created for the ShuffleMapTask 2, as it is a > // different job > assert(mapStage.id === 1) > assert(mapStage.latestInfo.failureReason.isEmpty) > // only the partition reported in fetch failure is resubmitted > assert(mapStage.latestInfo.numTasks === 1) > > val stage0Retry = taskSets.filter(_.stageId == 1) > assert(stage0Retry.size === 1) > // make the original task succeed > runEvent(makeCompletionEvent(stage0Retry.head.tasks(fetchFailParentPartition), Success, > result = MapStatus(BlockManagerId("hostF-exec1", "hostF", 12345), > Array.fill[Long](2)(2), mapTaskId = taskIdCount))) > Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) > > // The retries should succeed > sc.listenerBus.waitUntilEmpty() > assert(completedStage === List(0, 2, 1, 2)) > > // Now submit the entire dag again > // This will add 3 new stages. > submit(reduceRdd, Array(0, 1)) > Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) > > // Only the last stage needs to execute, and those tasks - so completed stages should not > // change. > sc.listenerBus.waitUntilEmpty() > > assert(completedStage === List(0, 2, 1, 2)) > > // All other stages should be done, and only the final stage should be waiting > assert(scheduler.runningStages.size === 1) > assert(scheduler.runningStages.head.id === 5) > assert(taskSets.count(_.stageId == 5) === 1) > > complete(taskSets.filter(_.stageId == 5).head, Seq((Success, 1), (Success, 2))) > > sc.listenerBus.waitUntilEmpty() > assert(completedStage === List(0, 2, 1, 2, 5)) > } > } > ``` > > Would be good to adapt/clean it up for your PR, in addition to the existing test - so that the observed bug does not recur. > > (Good news is, this PR works against it :-) ) Thank you for your advice on the UT I wrote, it was very important to me. I will delete my UT. thanks again very much -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org