mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1474782673

   Instead of testing specifically for the flag - which is subject to change as 
the implementation evolves, we should test for behavior here.
   
   This is the reproducible test I was using (with some changes) to test 
approaches for this bug:
   
   ```
     for (pushBasedShuffleEnabled <- Seq(true, false)) {
       test("SPARK-40082: recomputation of shuffle map stage with no pending 
partitions should not " +
           s"hang. pushBasedShuffleEnabled = $pushBasedShuffleEnabled") {
   
         if (pushBasedShuffleEnabled) {
           initPushBasedShuffleConfs(conf)
           DAGSchedulerSuite.clearMergerLocs()
           DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", 
"host4", "host5"))
         }
   
         var taskIdCount = 0
   
         var completedStage: List[Int] = Nil
         val listener = new SparkListener() {
           override def onStageCompleted(event: SparkListenerStageCompleted): 
Unit = {
             completedStage = completedStage :+ event.stageInfo.stageId
           }
         }
         sc.addSparkListener(listener)
   
         val fetchFailParentPartition = 0
   
         val shuffleMapRdd0 = new MyRDD(sc, 2, Nil)
         val shuffleDep0 = new ShuffleDependency(shuffleMapRdd0, new 
HashPartitioner(2))
   
         val shuffleMapRdd1 = new MyRDD(sc, 2, List(shuffleDep0), tracker = 
mapOutputTracker)
         val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new 
HashPartitioner(2))
   
         val reduceRdd = new MyRDD(sc, 2, List(shuffleDep1), tracker = 
mapOutputTracker)
   
         // submit the initial mapper stage, generate shuffle output for first 
reducer stage.
         submitMapStage(shuffleDep0)
   
         // Map stage completes successfully,
         completeShuffleMapStageSuccessfully(0, 0, 3, Seq("hostA", "hostB"))
         taskIdCount += 2
         assert(completedStage === List(0))
   
         // Now submit the first reducer stage
         submitMapStage(shuffleDep1)
   
         def createTaskInfo(speculative: Boolean): TaskInfo = {
           val taskInfo = new TaskInfo(
             taskId = taskIdCount,
             index = 0,
             attemptNumber = 0,
             partitionId = 0,
             launchTime = 0L,
             executorId = "",
             host = "hostC",
             TaskLocality.ANY,
             speculative = speculative)
           taskIdCount += 1
           taskInfo
         }
   
         val normalTask = createTaskInfo(speculative = false);
         val speculativeTask = createTaskInfo(speculative = true)
   
         // fail task 1.0 due to FetchFailed, and make 1.1 succeed.
         runEvent(makeCompletionEvent(taskSets(1).tasks(0),
           FetchFailed(makeBlockManagerId("hostA"), shuffleDep0.shuffleId, 
normalTask.taskId,
             fetchFailParentPartition, normalTask.index, "ignored"),
           result = null,
           Seq.empty,
           Array.empty,
           normalTask))
   
         // Make the speculative task succeed after initial task has failed
         runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success,
           result = MapStatus(BlockManagerId("hostD-exec1", "hostD", 34512),
             Array.fill[Long](2)(2), mapTaskId = speculativeTask.taskId),
           taskInfo = speculativeTask))
   
         // The second task, for partition 1 succeeds as well.
         runEvent(makeCompletionEvent(taskSets(1).tasks(1), Success,
           result = MapStatus(BlockManagerId("hostE-exec2", "hostE", 23456),
             Array.fill[Long](2)(2), mapTaskId = taskIdCount),
         ))
         taskIdCount += 1
   
         sc.listenerBus.waitUntilEmpty()
         assert(completedStage === List(0, 2))
   
         // the stages will now get resubmitted due to the failure
         Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
   
         // parent map stage resubmitted
         assert(scheduler.runningStages.size === 1)
         val mapStage = scheduler.runningStages.head
   
         // Stage 1 is same as Stage 0 - but created for the ShuffleMapTask 2, 
as it is a
         // different job
         assert(mapStage.id === 1)
         assert(mapStage.latestInfo.failureReason.isEmpty)
         // only the partition reported in fetch failure is resubmitted
         assert(mapStage.latestInfo.numTasks === 1)
   
         val stage0Retry = taskSets.filter(_.stageId == 1)
         assert(stage0Retry.size === 1)
         // make the original task succeed
         
runEvent(makeCompletionEvent(stage0Retry.head.tasks(fetchFailParentPartition), 
Success,
           result = MapStatus(BlockManagerId("hostF-exec1", "hostF", 12345),
             Array.fill[Long](2)(2), mapTaskId = taskIdCount)))
         Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
   
         // The retries should succeed
         sc.listenerBus.waitUntilEmpty()
         assert(completedStage === List(0, 2, 1, 2))
   
         // Now submit the entire dag again
         // This will add 3 new stages.
         submit(reduceRdd, Array(0, 1))
         Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
   
         // Only the last stage needs to execute, and those tasks - so 
completed stages should not
         // change.
         sc.listenerBus.waitUntilEmpty()
   
         assert(completedStage === List(0, 2, 1, 2))
   
         // All other stages should be done, and only the final stage should be 
waiting
         assert(scheduler.runningStages.size === 1)
         assert(scheduler.runningStages.head.id === 5)
         assert(taskSets.count(_.stageId == 5) === 1)
   
         complete(taskSets.filter(_.stageId == 5).head, Seq((Success, 1), 
(Success, 2)))
   
         sc.listenerBus.waitUntilEmpty()
         assert(completedStage === List(0, 2, 1, 2, 5))
       }
     }
   ```
   
   Would be good to adapt/clean it up for your PR - so that the observed bug 
does not recur.
   
   (Good news is, this PR works against it :-) )
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to