gaoyajun02 opened a new pull request, #56559: URL: https://github.com/apache/spark/pull/56559
### What changes were proposed in this pull request? Fix push-based shuffle serving stale/incorrect data when multiple task attempts for the same map partition both push data to the external merger. Three layers of defense: ShuffleWriteProcessor — Defer shuffle block push to TaskCompletionListener so that killed/failed tasks never push. The listener checks context.isInterrupted() / context.isFailed() before initiating push, and logs which task was skipped and why. TaskSetManager + MapOutputTracker — Track stale (duplicate) partitionIds on the driver side. When a speculative or retried ShuffleMapTask result arrives after another attempt for the same partition has already committed, TaskSetManager.detectStalePushIfShuffleTask marks the partition's ID as stale in ShuffleStatus. The stale set is serialized into serializedMapAndMergeStatus and propagated to executor-side MapOutputTrackerWorker alongside existing map output metadata. PushBasedFetchHelper — Before reading any merged block on the reducer side, check chunk-level RoaringBitmaps for stale partitionIds via the new checkStaleMapIdInMergedBlock method. If a stale partitionId is found in any chunk bitmap, log a WARN and fall back to fetching original unmerged blocks. When the stale set is empty (the common case), this check short-circuits immediately with zero overhead. ### Why are the changes needed? With push-based shuffle enabled, indeterminate stages can produce incorrect results when speculation or task retry causes two attempts of the same map partition to both push data to the external merger. The merged block ends up with interleaved data from both attempts, but only one attempt's MapStatus is committed on the driver. Downstream reducers silently read corrupted/duplicated data. This is triggered by non-deterministic functions (rand(), UUID(), etc.) in shuffle keys or join conditions where row ordering differs between attempts. Even with deferred push (layer 1), there is a race window: if an original attempt succeeds and pushes, then gets killed by a speculative winner that also pushes, the merger receives data from both attempts but only one MapStatus is registered. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - UT - Manual verification on internal cluster confirmed: speculative duplicate pushes are either prevented by deferred push or detected and fallen back from, with expected WARN log on fallback. ### Was this patch authored or co-authored using generative AI tooling? Yes, co-authored with GLM-5V-Turbo. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
