gaoyajun02 commented on code in PR #56559:
URL: https://github.com/apache/spark/pull/56559#discussion_r3465162731


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2921,6 +2921,140 @@ class TaskSetManagerSuite
         s"\nCaptured logs:\n${logs.mkString("\n")}")
   }
 
+  test("SPARK-57491: late-arriving speculative ShuffleMapTask marks stale 
partitionId") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), 
("exec3", "host3"))
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+
+    val taskSet = FakeTask.createShuffleMapTaskSet(2, 0, 0,
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host2", "exec2")))
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    // Register shuffle in MapOutputTrackerMaster so 
detectStalePushIfShuffleTask can find it
+    val mapOutputTrackerMaster = sched.mapOutputTracker
+    val shuffleId = taskSet.shuffleId.get
+    mapOutputTrackerMaster.registerShuffle(shuffleId, 2, 2)
+
+    // Offer resources for 2 tasks to start
+    val task0 = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1.get
+    val task1 = manager.resourceOffer("exec2", "host2", PROCESS_LOCAL)._1.get
+    assert(task0.index === 0)
+    assert(task1.index === 1)
+
+    // Advance clock so tasks have been running long enough (markFinished 
requires time > 0)
+    clock.advance(1)
+
+    // Complete task 1 (partition 1) successfully with a MapStatus
+    val mapStatus1 = MapStatus(BlockManagerId("exec2", "host2", 2000), 
Array(2L, 2L), mapTaskId = 1)
+    val result1 = createMapStatusTaskResult(mapStatus1, accumUpdatesByTask(1))
+    manager.handleSuccessfulTask(task1.taskId, result1)
+    assert(sched.endedTasks(task1.index) === Success)
+
+    // Advance clock so task 0 has been running long enough for speculation.
+    // checkSpeculatableTasks requires tasks to have been running > 0ms when 
threshold is 0.
+    clock.advance(1)
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(0))
+
+    // Offer resource to start the speculative attempt for partition 0 on a 
different host
+    val specTaskOption = manager.resourceOffer("exec3", "host3", ANY)._1
+    assert(specTaskOption.isDefined, "Expected speculative task to be 
launched")
+    val specTask = specTaskOption.get
+    assert(specTask.index === 0)
+    assert(specTask.attemptNumber === 1)
+
+    // Replace backend with mock before completing original task 0, to handle 
killTask call
+    sched.backend = mock(classOf[SchedulerBackend])
+    sched.dagScheduler.stop()
+    sched.dagScheduler = mock(classOf[DAGScheduler])
+
+    // Complete original task 0 (partition 0) - this will kill the speculative 
attempt
+    val mapStatus0 = MapStatus(BlockManagerId("exec1", "host1", 1000), 
Array(1L, 1L), mapTaskId = 0)
+    val result0 = createMapStatusTaskResult(mapStatus0, accumUpdatesByTask(0))
+    manager.handleSuccessfulTask(task0.taskId, result0)
+
+    // Verify no stale pushed map indexes yet (stale is only marked when late 
result arrives)
+    assert(mapOutputTrackerMaster.getStaleMapIndexes(shuffleId).isEmpty)
+
+    // Now the speculative attempt's result arrives late. Since task 0 already 
succeeded,
+    // handleSuccessfulTask will see successful(0)=true and 
killedByOtherAttempt contains
+    // the speculative tid, triggering detectStalePushIfShuffleTask.
+    val specMapStatus = MapStatus(
+      BlockManagerId("exec3", "host3", 3000), Array(3L, 3L), mapTaskId = 999)
+    val specResult = createMapStatusTaskResult(specMapStatus, 
accumUpdatesByTask(0))
+    manager.handleSuccessfulTask(specTask.taskId, specResult)
+
+    // Verify that partition 0 is now tracked as stale
+    val staleMapIndexes = mapOutputTrackerMaster.getStaleMapIndexes(shuffleId)
+    assert(staleMapIndexes.contains(0),
+      s"Expected staleMapIndexes to contain mapIndex 0, got $staleMapIndexes")
+  }
+
+  test("SPARK-33235: late-arriving result for finished task marks stale 
partitionId") {

Review Comment:
   Thanks for catching this — you're absolutely right. This test is a leftover 
from before the `info.finished` early-return was finalized. The production code 
in `handleSuccessfulTask` (line 816-822) returns immediately on `info.finished` 
**without** calling `detectStalePushIfShuffleTask`, and the sole call site for 
`detectStalePushIfShuffleTask` is the `killedByOtherAttempt` branch at line 828 
— exactly as designed, since a re-delivery of an already-committed winner 
result must not mark anything stale.
   
   I've removed this test entirely. The speculative duplicate path 
(`killedByOtherAttempt`) is already covered by the `SPARK-57491: speculative 
duplicate result marks stale partitionId` test in the same suite.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to