gaoyajun02 commented on code in PR #56559:
URL: https://github.com/apache/spark/pull/56559#discussion_r3465162731
##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2921,6 +2921,140 @@ class TaskSetManagerSuite
s"\nCaptured logs:\n${logs.mkString("\n")}")
}
+ test("SPARK-57491: late-arriving speculative ShuffleMapTask marks stale
partitionId") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
+ sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
+ sc.conf.set(config.SPECULATION_ENABLED, true)
+
+ val taskSet = FakeTask.createShuffleMapTaskSet(2, 0, 0,
+ Seq(TaskLocation("host1", "exec1")),
+ Seq(TaskLocation("host2", "exec2")))
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Register shuffle in MapOutputTrackerMaster so
detectStalePushIfShuffleTask can find it
+ val mapOutputTrackerMaster = sched.mapOutputTracker
+ val shuffleId = taskSet.shuffleId.get
+ mapOutputTrackerMaster.registerShuffle(shuffleId, 2, 2)
+
+ // Offer resources for 2 tasks to start
+ val task0 = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1.get
+ val task1 = manager.resourceOffer("exec2", "host2", PROCESS_LOCAL)._1.get
+ assert(task0.index === 0)
+ assert(task1.index === 1)
+
+ // Advance clock so tasks have been running long enough (markFinished
requires time > 0)
+ clock.advance(1)
+
+ // Complete task 1 (partition 1) successfully with a MapStatus
+ val mapStatus1 = MapStatus(BlockManagerId("exec2", "host2", 2000),
Array(2L, 2L), mapTaskId = 1)
+ val result1 = createMapStatusTaskResult(mapStatus1, accumUpdatesByTask(1))
+ manager.handleSuccessfulTask(task1.taskId, result1)
+ assert(sched.endedTasks(task1.index) === Success)
+
+ // Advance clock so task 0 has been running long enough for speculation.
+ // checkSpeculatableTasks requires tasks to have been running > 0ms when
threshold is 0.
+ clock.advance(1)
+ assert(manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set(0))
+
+ // Offer resource to start the speculative attempt for partition 0 on a
different host
+ val specTaskOption = manager.resourceOffer("exec3", "host3", ANY)._1
+ assert(specTaskOption.isDefined, "Expected speculative task to be
launched")
+ val specTask = specTaskOption.get
+ assert(specTask.index === 0)
+ assert(specTask.attemptNumber === 1)
+
+ // Replace backend with mock before completing original task 0, to handle
killTask call
+ sched.backend = mock(classOf[SchedulerBackend])
+ sched.dagScheduler.stop()
+ sched.dagScheduler = mock(classOf[DAGScheduler])
+
+ // Complete original task 0 (partition 0) - this will kill the speculative
attempt
+ val mapStatus0 = MapStatus(BlockManagerId("exec1", "host1", 1000),
Array(1L, 1L), mapTaskId = 0)
+ val result0 = createMapStatusTaskResult(mapStatus0, accumUpdatesByTask(0))
+ manager.handleSuccessfulTask(task0.taskId, result0)
+
+ // Verify no stale pushed map indexes yet (stale is only marked when late
result arrives)
+ assert(mapOutputTrackerMaster.getStaleMapIndexes(shuffleId).isEmpty)
+
+ // Now the speculative attempt's result arrives late. Since task 0 already
succeeded,
+ // handleSuccessfulTask will see successful(0)=true and
killedByOtherAttempt contains
+ // the speculative tid, triggering detectStalePushIfShuffleTask.
+ val specMapStatus = MapStatus(
+ BlockManagerId("exec3", "host3", 3000), Array(3L, 3L), mapTaskId = 999)
+ val specResult = createMapStatusTaskResult(specMapStatus,
accumUpdatesByTask(0))
+ manager.handleSuccessfulTask(specTask.taskId, specResult)
+
+ // Verify that partition 0 is now tracked as stale
+ val staleMapIndexes = mapOutputTrackerMaster.getStaleMapIndexes(shuffleId)
+ assert(staleMapIndexes.contains(0),
+ s"Expected staleMapIndexes to contain mapIndex 0, got $staleMapIndexes")
+ }
+
+ test("SPARK-33235: late-arriving result for finished task marks stale
partitionId") {
Review Comment:
Thanks for catching this — you're absolutely right. This test is a leftover
from before the `info.finished` early-return was finalized. The production code
in `handleSuccessfulTask` (line 816-822) returns immediately on `info.finished`
**without** calling `detectStalePushIfShuffleTask`, and the sole call site for
`detectStalePushIfShuffleTask` is the `killedByOtherAttempt` branch at line 828
— exactly as designed, since a re-delivery of an already-committed winner
result must not mark anything stale.
I've removed this test entirely. The speculative duplicate path
(`killedByOtherAttempt`) is already covered by the `SPARK-57491: speculative
duplicate result marks stale partitionId` test in the same suite.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]