gaoyajun02 opened a new pull request, #56559:
URL: https://github.com/apache/spark/pull/56559

   ### What changes were proposed in this pull request?
   
   Fix push-based shuffle serving stale/incorrect data when multiple task 
attempts for the same map partition both push data to the external merger. 
Three layers of defense:
   ShuffleWriteProcessor — Defer shuffle block push to TaskCompletionListener 
so that killed/failed tasks never push. The listener checks 
context.isInterrupted() / context.isFailed() before initiating push, and logs 
which task was skipped and why.
   TaskSetManager + MapOutputTracker — Track stale (duplicate) partitionIds on 
the driver side. When a speculative or retried ShuffleMapTask result arrives 
after another attempt for the same partition has already committed, 
TaskSetManager.detectStalePushIfShuffleTask marks the partition's ID as stale 
in ShuffleStatus. The stale set is serialized into serializedMapAndMergeStatus 
and propagated to executor-side MapOutputTrackerWorker alongside existing map 
output metadata.
   PushBasedFetchHelper — Before reading any merged block on the reducer side, 
check chunk-level RoaringBitmaps for stale partitionIds via the new 
checkStaleMapIdInMergedBlock method. If a stale partitionId is found in any 
chunk bitmap, log a WARN and fall back to fetching original unmerged blocks. 
When the stale set is empty (the common case), this check short-circuits 
immediately with zero overhead.
   
   ### Why are the changes needed?
   
   With push-based shuffle enabled, indeterminate stages can produce incorrect 
results when speculation or task retry causes two attempts of the same map 
partition to both push data to the external merger. The merged block ends up 
with interleaved data from both attempts, but only one attempt's MapStatus is 
committed on the driver. Downstream reducers silently read corrupted/duplicated 
data.
   This is triggered by non-deterministic functions (rand(), UUID(), etc.) in 
shuffle keys or join conditions where row ordering differs between attempts. 
Even with deferred push (layer 1), there is a race window: if an original 
attempt succeeds and pushes, then gets killed by a speculative winner that also 
pushes, the merger receives data from both attempts but only one MapStatus is 
registered.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   - UT
   - Manual verification on internal cluster confirmed: speculative duplicate 
pushes are either prevented by deferred push or detected and fallen back from, 
with expected WARN log on fallback.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   Yes, co-authored with GLM-5V-Turbo.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to