jerrypeng opened a new pull request, #56196:
URL: https://github.com/apache/spark/pull/56196

   ### What changes were proposed in this pull request?
     
     This is **part 3** of a multi-PR effort to add *streaming shuffle* to 
Spark — a push-based shuffle used by Real-Time Mode (RTM) structured streaming, 
where writer tasks push records
     directly to reader tasks over the network instead of writing map output to 
disk for readers to pull.
   
     This PR adds the shuffle-manager layer that later PRs plug into:
   
     - **`StreamingShuffleManager`** — a `ShuffleManager` implementation for 
streaming shuffle. `getWriter`/`getReader` are intentionally stubbed in this PR 
(they throw
     `UnsupportedOperationException`) and are implemented in the push-path / 
pull-path PRs that follow.
     - **`MultiShuffleManager`** — routes each shuffle to either the batch 
`SortShuffleManager` or the `StreamingShuffleManager`, based on a per-query 
local property, so a single application
     can mix batch and streaming shuffle.
     - **`TaskContextAwareLogging`** — a `Logging` mixin that prefixes log 
lines with queryId / shuffleId / stageId / taskId.
     - **`SparkEnv`** — exposes the `StreamingShuffleOutputTracker` (added in 
part 2) to executors, and initializes it **only** when the configured shuffle 
manager is `StreamingShuffleManager`
     or `MultiShuffleManager`.
     - Two streaming-shuffle error conditions 
(`STREAMING_SHUFFLE_INCORRECT_SEQUENCE_NUMBER`, 
`STREAMING_SHUFFLE_UNEXPECTED_MESSAGE_TYPE`) and the `STREAMING_QUERY_ID` log 
key.
   
     The full PR stack:
   
     - **Part 1** (SPARK-56674, *merged*) — streaming shuffle wire protocol 
(Netty messages).
     - **Part 2** (SPARK-56962, *merged*) — `StreamingShuffleOutputTracker` 
(driver-side writer-location coordination).
     - **Part 3** (*this PR*) — shuffle-manager layer 
(`StreamingShuffleManager` + `MultiShuffleManager`), logging mixin, and 
SparkEnv tracker wiring.
     - **Part 4** — `StreamingShuffleWriter` + server-side Netty handler (push 
path).
     - **Part 5** — `StreamingShuffleReader` + client-side Netty handler (pull 
path).
     - **Part 6** — register streaming shuffles with the tracker in 
`DAGScheduler` (activation).
     - **Part 7** — end-to-end `StreamingShuffleSuite`.
     - **Part 8** — documentation.
   
     ### Why are the changes needed?
     
     Real-Time Mode / low-latency continuous queries need shuffle data to flow 
continuously between stages. The default sort shuffle (write map output to 
disk, then have reducers pull it) adds
     latency that is unacceptable for these workloads. Streaming shuffle 
instead pushes records directly from writer tasks to reader tasks.
   
     This PR lands the manager layer that the writer and reader implementations 
attach to, plus `MultiShuffleManager` so batch stages keep using the sort 
shuffle while streaming stages use the
     streaming shuffle within the same application.
   
     ### Does this PR introduce _any_ user-facing change?
   
     No. The new shuffle managers are opt-in via `spark.shuffle.manager` and 
are not the default; `getWriter`/`getReader` are still stubbed in this PR, so 
the feature is not yet usable
     end-to-end (completed in later PRs). The `StreamingShuffleOutputTracker` 
is initialized only when one of the new managers is configured, so there is no 
change to the default (sort
     shuffle) path — this is covered by tests.
     
     ### How was this patch tested?
   
     New unit suites:
     
     - **`StreamingShuffleManagerSuite`** — `getWriterId` for data/termination 
messages and the unexpected-message-type error; `getQueryId` resolution and 
failure; `registerShuffle` handle
     type; and SparkEnv gating (tracker is present for 
`StreamingShuffleManager`, absent for the default manager).
     - **`MultiShuffleManagerSuite`** — per-query streaming-vs-batch routing, 
the enable property, and SparkEnv gating for `MultiShuffleManager`.
   
     13 tests, all passing. `SparkThrowableSuite` validates the two new error 
conditions.
   
     ### Was this patch authored or co-authored using generative AI tooling?
     
     Co-authored with Claude Code (Claude Opus 4.8)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to