jerrypeng opened a new pull request, #56196:
URL: https://github.com/apache/spark/pull/56196
### What changes were proposed in this pull request?
This is **part 3** of a multi-PR effort to add *streaming shuffle* to
Spark — a push-based shuffle used by Real-Time Mode (RTM) structured streaming,
where writer tasks push records
directly to reader tasks over the network instead of writing map output to
disk for readers to pull.
This PR adds the shuffle-manager layer that later PRs plug into:
- **`StreamingShuffleManager`** — a `ShuffleManager` implementation for
streaming shuffle. `getWriter`/`getReader` are intentionally stubbed in this PR
(they throw
`UnsupportedOperationException`) and are implemented in the push-path /
pull-path PRs that follow.
- **`MultiShuffleManager`** — routes each shuffle to either the batch
`SortShuffleManager` or the `StreamingShuffleManager`, based on a per-query
local property, so a single application
can mix batch and streaming shuffle.
- **`TaskContextAwareLogging`** — a `Logging` mixin that prefixes log
lines with queryId / shuffleId / stageId / taskId.
- **`SparkEnv`** — exposes the `StreamingShuffleOutputTracker` (added in
part 2) to executors, and initializes it **only** when the configured shuffle
manager is `StreamingShuffleManager`
or `MultiShuffleManager`.
- Two streaming-shuffle error conditions
(`STREAMING_SHUFFLE_INCORRECT_SEQUENCE_NUMBER`,
`STREAMING_SHUFFLE_UNEXPECTED_MESSAGE_TYPE`) and the `STREAMING_QUERY_ID` log
key.
The full PR stack:
- **Part 1** (SPARK-56674, *merged*) — streaming shuffle wire protocol
(Netty messages).
- **Part 2** (SPARK-56962, *merged*) — `StreamingShuffleOutputTracker`
(driver-side writer-location coordination).
- **Part 3** (*this PR*) — shuffle-manager layer
(`StreamingShuffleManager` + `MultiShuffleManager`), logging mixin, and
SparkEnv tracker wiring.
- **Part 4** — `StreamingShuffleWriter` + server-side Netty handler (push
path).
- **Part 5** — `StreamingShuffleReader` + client-side Netty handler (pull
path).
- **Part 6** — register streaming shuffles with the tracker in
`DAGScheduler` (activation).
- **Part 7** — end-to-end `StreamingShuffleSuite`.
- **Part 8** — documentation.
### Why are the changes needed?
Real-Time Mode / low-latency continuous queries need shuffle data to flow
continuously between stages. The default sort shuffle (write map output to
disk, then have reducers pull it) adds
latency that is unacceptable for these workloads. Streaming shuffle
instead pushes records directly from writer tasks to reader tasks.
This PR lands the manager layer that the writer and reader implementations
attach to, plus `MultiShuffleManager` so batch stages keep using the sort
shuffle while streaming stages use the
streaming shuffle within the same application.
### Does this PR introduce _any_ user-facing change?
No. The new shuffle managers are opt-in via `spark.shuffle.manager` and
are not the default; `getWriter`/`getReader` are still stubbed in this PR, so
the feature is not yet usable
end-to-end (completed in later PRs). The `StreamingShuffleOutputTracker`
is initialized only when one of the new managers is configured, so there is no
change to the default (sort
shuffle) path — this is covered by tests.
### How was this patch tested?
New unit suites:
- **`StreamingShuffleManagerSuite`** — `getWriterId` for data/termination
messages and the unexpected-message-type error; `getQueryId` resolution and
failure; `registerShuffle` handle
type; and SparkEnv gating (tracker is present for
`StreamingShuffleManager`, absent for the default manager).
- **`MultiShuffleManagerSuite`** — per-query streaming-vs-batch routing,
the enable property, and SparkEnv gating for `MultiShuffleManager`.
13 tests, all passing. `SparkThrowableSuite` validates the two new error
conditions.
### Was this patch authored or co-authored using generative AI tooling?
Co-authored with Claude Code (Claude Opus 4.8)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]