gaoyajun02 commented on code in PR #56559:
URL: https://github.com/apache/spark/pull/56559#discussion_r3432715683


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -930,6 +936,42 @@ private[spark] class TaskSetManager(
       taskInfoWithAccumulables)
   }
 
+  /**
+   * For ShuffleMapTasks, detect stale push: if a partition already has
+   * a registered MapStatus with a different mapId, it means another attempt 
for the same
+   * partition also pushed data to the merger. Mark this partition so that 
reducers will
+   * skip the merged block and fallback to unmerged blocks.
+   *
+   * This is called from handleSuccessfulTask for late-arriving or killed 
attempt results,
+   * where the task result won't be forwarded to DAGScheduler (so 
DAGScheduler's own
+   * stale detection won't cover these cases).
+   */
+  private def detectStalePushIfShuffleTask(
+      tid: Long, index: Int, result: DirectTaskResult[_]): Unit = {
+    if (!isShuffleMapTasks || shuffleId.isEmpty) {
+      return
+    }
+    val status = result.value()
+    status match {
+      case mapStatus: MapStatus =>
+        val sid = shuffleId.get
+        val partitionId = tasks(index).partitionId
+        val mapOutputTrackerMaster = sched.mapOutputTracker
+        val shuffleStatusOpt = mapOutputTrackerMaster.shuffleStatuses.get(sid)
+        shuffleStatusOpt.foreach { shuffleStatus =>
+          // This method is only called for late-arriving or killed attempts, 
meaning the
+          // partition already has a successful attempt registered. Any 
MapStatus arriving
+          // here is from a stale (redundant) attempt that also pushed data.
+          // Mark its mapId as stale so reducers can detect it in merged block 
chunks.
+          shuffleStatus.markStalePushedMap(partitionId)

Review Comment:
   Thank you for the review. You raise an excellent point about the 
unconditional marking. After tracing through the code, here's my analysis:
   
   1. The info.finished path: winner re-delivery, not a stale push. 
   handleSuccessfulTask is serialized under TaskSchedulerImpl.synchronized 
(line 896), so two results for the same partition are never processed 
concurrently — they are strictly ordered. The info.finished guard exists to 
handle message retransmission of the same winning attempt, not a concurrent 
late arrival from a different attempt. Marking it as stale is incorrect: this 
is the canonical result arriving twice.
   2. The killedByOtherAttempt path: push has occurredFor a task result to reach
   handleSuccessfulTask, the executor must have sent it with state FINISHED 
(Executor.scala:1029). A task only reports FINISHED after runTask() completes 
successfully and the completion listener fires. At that point, 
ShuffleWriteProcessor's listener checks !context.isInterrupted() && 
!context.isFailed() — both are still false because the task finished normally. 
So initiateBlockPush() is always called before the result is sent back. The 
kill signal arrives later (or concurrently on a different thread) but cannot 
retroactively undo the already-submitted push job.
   We have actually observed this in production(with this PR's 
   changes applied): 
   when the driver receives two tasks with the same mapIndex and rejects one
   
   <img width="3760" height="466" alt="Image" 
src="https://github.com/user-attachments/assets/d6278584-aac9-4f47-989e-ec350405aa8f";
 />
   
    our debug logs confirm that the rejected task had already started pushing 
before its result arrived at the driver. This validates that the 
killedByOtherAttempt path always represents a real stale push.
   
   <img width="3228" height="708" alt="Image" 
src="https://github.com/user-attachments/assets/b730d485-f3e4-4e86-b68d-5b3b07e2d043";
 />
   
   
   Therefore, only the killedByOtherAttempt path legitimately needs 
markStalePushedPartition. The info.finished path should skip it entirely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to