gaoyajun02 commented on code in PR #56559:
URL: https://github.com/apache/spark/pull/56559#discussion_r3432849409


##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -105,6 +105,28 @@ private class ShuffleStatus(
    */
   private[spark] val checksumMismatchIndices: Set[Int] = Set()
 
+  /**
+   * Set of stale mapIds for this shuffle. When task retry or speculation 
causes multiple
+   * attempts for the same map output to push, the merger may include data 
from a stale attempt.
+   * We record the stale mapIds here so the reduce side can check chunkBitmaps 
and fallback
+   * if stale data is present in a merged block.
+   */
+  private[this] val staleMapIds = new java.util.HashSet[Int]()

Review Comment:
   We've carefully evaluated whether `staleMapIndexes` should be cleared in 
`addMapOutput`, and believe the current design is correct as-is. Here's the 
reasoning:
   
   **Core principle: `staleMapIndexes` cleanup should be tied to merged block 
lifecycle, not map output lifecycle.**
   
   The stale set protects against reading **merged blocks that contain stale 
chunks**. A partition's stale mark can only be safely removed when we know all 
corresponding merged blocks have been refreshed or discarded — which means all 
merger locations must be updated (either new `shuffleMergeId`, or explicit 
merge result invalidation).
   
   **Why not clearing in `addMapOutput`:**
   
   1. `addMapOutput` only updates **map-side metadata** (`mapStatuses`) — it 
does not invalidate any existing merged blocks on mergers. The physical merged 
files on the external shuffle service may still contain stale chunks from 
duplicate pushes.
   
   2. Within the same `stageAttempt` / `shuffleMergeId`, a late-arriving 
speculative `addMapOutput` call is exactly the event whose MapStatus *is* the 
stale one — clearing the stale mark here would defeat layers 2/3.
   
   **Stage retry (full reset) — confirmed handled:**
   
   When `unregisterAllMapAndMergeOutput` is called (stage retry / rollback / 
barrier stage abort), it triggers:
   
   - `removeMergeResultsByFilter(x => true)` — all merge results cleared
   - `removeShuffleMergerLocations()` — all merger locations cleared  
   - `newShuffleMergeState()` → `shuffleMergeId += 1` — new merge ID
   - `incrementEpoch()` — workers fetch fresh `ShuffleStatus` on next 
`getStatuses` call
   
   The worker-side `staleMapIndexes` for this shuffleId is either cleared by 
`unregisterShuffle` or replaced entirely when workers re-fetch with the new 
epoch and get a fresh serialized snapshot where the driver-side 
`staleMapIndexes` set is empty (new `ShuffleStatus`).
   
   **Partial merge cleanup (single FetchFailed) — intentional keep:**
   
   For non-barrier stages, `unregisterMergeResult` only removes a specific 
`<reduceId, bmAddress>` merge entry without incrementing `shuffleMergeId`. 
Other reduces' merged blocks on the same mergers may still contain stale 
chunks, so keeping the stale mark and falling back is the safe choice.
   
   **Future optimization (not blocking):**
   
   If we want to avoid unnecessary fallbacks in the partial-cleanup case, we 
could track stale at **merger-location granularity** (e.g., 
`staleMapIndexesByMerger[mergerAddress][partitionId]`) and clear entries when 
that specific merger's merge results are all invalidated. But this adds 
complexity for marginal gain — the fallback path is correct and only costs 
performance, not correctness.
   
   In summary: the current "only-add, clear-on-shuffle-end" semantics are 
intentional and match the merged block lifecycle. We're happy to add 
location-level tracking as a follow-up optimization if desired.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to