cloud-fan commented on code in PR #56559:
URL: https://github.com/apache/spark/pull/56559#discussion_r3431887706


##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -105,6 +105,28 @@ private class ShuffleStatus(
    */
   private[spark] val checksumMismatchIndices: Set[Int] = Set()
 
+  /**
+   * Set of stale mapIds for this shuffle. When task retry or speculation 
causes multiple
+   * attempts for the same map output to push, the merger may include data 
from a stale attempt.
+   * We record the stale mapIds here so the reduce side can check chunkBitmaps 
and fallback
+   * if stale data is present in a merged block.
+   */
+  private[this] val staleMapIds = new java.util.HashSet[Int]()

Review Comment:
   `staleMapIds` is only ever added to — `addMapOutput`/`updateMapOutput` and 
the output-invalidation paths never clear it; only `unregisterShuffle` drops 
it. So a partition stays marked, and its merged blocks keep falling back to 
unmerged, for the whole life of the shuffle, even after a clean recompute 
re-registers a valid output. Clear the relevant id when its mapIndex is 
(re-)registered.



##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -105,6 +105,28 @@ private class ShuffleStatus(
    */
   private[spark] val checksumMismatchIndices: Set[Int] = Set()
 
+  /**
+   * Set of stale mapIds for this shuffle. When task retry or speculation 
causes multiple
+   * attempts for the same map output to push, the merger may include data 
from a stale attempt.
+   * We record the stale mapIds here so the reduce side can check chunkBitmaps 
and fallback
+   * if stale data is present in a merged block.
+   */
+  private[this] val staleMapIds = new java.util.HashSet[Int]()
+
+  /**
+   * Mark a map output as having stale (redundant) push attempts. Called from 
TaskSetManager when it
+   * detects that multiple task attempts for the same map output pushed data 
to the merger.
+   * @param staleMapId the mapId of the stale (redundant) attempt
+   */
+  def markStalePushedMap(staleMapId: Int): Unit = withWriteLock {

Review Comment:
   This set holds the **partitionId**, which equals the **mapIndex** the merger 
records in its chunk bitmaps — not `MapStatus.mapId`. `ShuffleMapTask` pushes 
with `mapIndex = partitionId`, `RemoteBlockPushResolver` does 
`chunkTracker.add(mapIndex)`, and the driver marks `tasks(index).partitionId`, 
so the reduce-side `contains()` check works only because `partitionId == 
mapIndex`. Naming all of this `*MapId*` (and logging `mapId=${mapStatus.mapId}` 
in `TaskSetManager`) hides that invariant: someone "fixing" the marked value to 
the real `mapId` would break the bitmap match and serve stale data as clean. 
Suggest renaming to `staleMapIndexes` / `markStalePushedPartition` and logging 
the partitionId you actually mark.



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -930,6 +936,42 @@ private[spark] class TaskSetManager(
       taskInfoWithAccumulables)
   }
 
+  /**
+   * For ShuffleMapTasks, detect stale push: if a partition already has
+   * a registered MapStatus with a different mapId, it means another attempt 
for the same
+   * partition also pushed data to the merger. Mark this partition so that 
reducers will
+   * skip the merged block and fallback to unmerged blocks.
+   *
+   * This is called from handleSuccessfulTask for late-arriving or killed 
attempt results,
+   * where the task result won't be forwarded to DAGScheduler (so 
DAGScheduler's own
+   * stale detection won't cover these cases).
+   */
+  private def detectStalePushIfShuffleTask(
+      tid: Long, index: Int, result: DirectTaskResult[_]): Unit = {
+    if (!isShuffleMapTasks || shuffleId.isEmpty) {
+      return
+    }
+    val status = result.value()
+    status match {
+      case mapStatus: MapStatus =>
+        val sid = shuffleId.get
+        val partitionId = tasks(index).partitionId
+        val mapOutputTrackerMaster = sched.mapOutputTracker
+        val shuffleStatusOpt = mapOutputTrackerMaster.shuffleStatuses.get(sid)
+        shuffleStatusOpt.foreach { shuffleStatus =>
+          // This method is only called for late-arriving or killed attempts, 
meaning the
+          // partition already has a successful attempt registered. Any 
MapStatus arriving
+          // here is from a stale (redundant) attempt that also pushed data.
+          // Mark its mapId as stale so reducers can detect it in merged block 
chunks.
+          shuffleStatus.markStalePushedMap(partitionId)

Review Comment:
   This marks the partition stale unconditionally, without checking whether 
this attempt actually pushed. With the new deferred push, a killed attempt's 
completion listener skips the push, so it usually pushed nothing — yet it is 
still marked, forcing reducers to fall back. The `info.finished` call site is 
sharper: it can fire on a re-delivered *winning* attempt, marking the 
legitimate partition. The net effect is unnecessary fallback on most 
speculation/retry, including deterministic stages. Can the marking be gated on 
evidence that the attempt pushed (and skip the winner re-delivery case)?



##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -1644,6 +1715,47 @@ private[spark] object MapOutputTracker extends Logging {
     }
   }
 
+  /**
+   * Serialize a set of stale (duplicate) mapIds into a compact byte array.
+   * Uses DataOutputStream for a simple, efficient binary format:
+   * [int: count][long: mapId1][long: mapId2]...
+   * This is intentionally lightweight compared to serializeOutputStatuses 
because
+   * staleMapIds is typically small (only non-empty during speculation 
retries).
+   */
+  def serializeStaleMapIds(staleMapIds: java.util.HashSet[Int]): Array[Byte] = 
{
+    val out = new ApacheByteArrayOutputStream()
+    val dataOut = new DataOutputStream(out)
+    Utils.tryWithSafeFinally {
+      dataOut.writeInt(staleMapIds.size())
+      val iter = staleMapIds.iterator()
+      while (iter.hasNext) {
+        dataOut.writeInt(iter.next())
+      }
+    } {
+      dataOut.close()
+    }
+    out.toByteArray
+  }
+
+  /**
+   * Deserialize a byte array produced by [[serializeStaleMapIds]] back into a 
HashSet of Long.

Review Comment:
   Returns `java.util.HashSet[Int]` (read via `readInt`), not Long:
   ```suggestion
      * Deserialize a byte array produced by [[serializeStaleMapIds]] back into 
a HashSet of Int.
   ```



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2921,6 +2921,89 @@ class TaskSetManagerSuite
         s"\nCaptured logs:\n${logs.mkString("\n")}")
   }
 
+    test("SPARK-57491: late-arriving speculative ShuffleMapTask marks stale 
partitionId") {

Review Comment:
   This test is indented inconsistently with the rest of the suite — `test(` 
should be at 2 spaces with the body at 4, matching the surrounding tests. As 
written (4-space `test(` and an under-indented body) it will trip scalastyle 
indentation.



##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -1644,6 +1715,47 @@ private[spark] object MapOutputTracker extends Logging {
     }
   }
 
+  /**
+   * Serialize a set of stale (duplicate) mapIds into a compact byte array.
+   * Uses DataOutputStream for a simple, efficient binary format:
+   * [int: count][long: mapId1][long: mapId2]...

Review Comment:
   The code below uses `writeInt`, not a long:
   ```suggestion
      * [int: count][int: mapId1][int: mapId2]...
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -813,7 +813,11 @@ private[spark] class TaskSetManager(
     val info = taskInfos(tid)
     // SPARK-37300: when the task was already finished state, just ignore it,
     // so that there won't cause successful and tasksSuccessful wrong result.
-    if (info.finished) {
+    if(info.finished) {

Review Comment:
   ```suggestion
       if (info.finished) {
   ```



##########
core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala:
##########
@@ -288,6 +300,34 @@ private class PushBasedFetchHelper(
     }
   }
 
+  /**
+   * Check whether a push-merged block contains data from stale (duplicate) 
task attempts.
+   * When speculation is enabled, multiple attempts for the same map output 
may both push data
+   * to the merger. The merger may include data from both attempts in the same 
merged block,
+   * but the driver only tracks one as the canonical MapStatus. We detect this 
by checking
+   * if any stale mapId appears in the server-side chunkBitmaps.
+   *
+   * @param shuffleBlockId ShuffleMergedBlockId to be checked
+   * @param address BlockManagerId of push-based shuffle service
+   * @param chunkBitmaps Chunks bitmap from push-based shuffle service site
+   * @return true if the merged block is clean (no stale data), false if stale 
data detected

Review Comment:
   This returns false whenever a marked partition is merely *present* in the 
block, not only when duplicated bytes are present (the bitmap keys on the 
shared mapIndex, so both attempts look identical). Reword to match what is 
computed:
   ```suggestion
      * @return false if any stale-marked mapIndex is present in this block 
(forcing fallback), true otherwise
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to