jerrypeng opened a new pull request, #56387:
URL: https://github.com/apache/spark/pull/56387

   ### What changes were proposed in this pull request?
     
     This is the foundation PR of a stacked set that adds a **streaming 
shuffle** — a push-based shuffle used by low-latency continuous / Real-Time 
Mode queries. It introduces only the
     building blocks shared by both the writer (push) and reader (pull) paths, 
so those two follow-up PRs can be reviewed in parallel and merged independently:
   
     - **`ErrorNotifier`** (`core`, `org.apache.spark.shuffle.streaming`) — 
records the first error raised on a background thread (e.g. a Netty event-loop 
thread) so the owning task thread can
     poll it and re-throw at a safe point.
     - **`TransportClient.send(ByteBuf)`** (`network-common`) — a new overload 
that sends a Netty `ByteBuf` and returns the `ChannelFuture`, plus widening the 
existing `send(ByteBuffer)`
     return type from `void` to `ChannelFuture` so callers can chain a 
completion listener.
     - **`spark.shuffle.streaming.checksum.enabled`** config — gates an 
optional end-to-end CRC32C integrity check (writer embeds the checksum, reader 
verifies it).
     - Two shared structured-logging keys: `NUM_TERMINATION_ACKS`, 
`SHUFFLE_WRITER_ID`.
   
     This is part of a stack of PRs:
     1. **(this PR)** shared transport + error plumbing
     2. writer + server-side Netty handler — depends on this PR
     3. reader + client-side Netty handler — depends on this PR
     4. activation (`DAGScheduler` / `BlockManager` wiring)
     5. tests (`StreamingShuffleSuite`)
     6. documentation
   
     These components are used by **both** the writer and reader sides, so 
extracting them into a single prerequisite PR keeps the writer and reader PRs 
independent and reviewable in parallel:
   
     - **`ErrorNotifier`**: the streaming shuffle performs network I/O on Netty 
event-loop threads. An error there must not be surfaced by calling 
`TaskContext.markTaskFailed` from a
     background thread, because that can race with `markTaskCompleted` and skip 
task-completion/cleanup listeners (leaking resources). Instead the background 
thread records the error via
     `ErrorNotifier.markError`, and the task thread polls and re-throws it at a 
safe point so cleanup runs on the task thread.
     - **`TransportClient.send(ByteBuf)` returning `ChannelFuture`**: both 
sides send Netty `ByteBuf` messages (writer: data buffers; reader: 
credit-control / termination-ack) and need to
     attach a listener to the send's completion. The existing 
`send(ByteBuffer)` returned `void`, so a chainable future was required; the new 
overload also avoids an extra copy by sending the
     `ByteBuf` directly.
     - The checksum config governs an optional end-to-end data-integrity check 
shared by the writer and reader.
   
     ### Does this PR introduce _any_ user-facing change?
   
     No behavior change. This PR adds the configuration 
`spark.shuffle.streaming.checksum.enabled` (default `true`), but it has no 
effect until the streaming shuffle writer/reader (follow-up
     PRs) are in use. The new `TransportClient.send(ByteBuf)` overload and the 
`send(ByteBuffer)` return-type widening (`void` -> `ChannelFuture`) are 
source-compatible additions; no existing
     caller consumes the returned future.
   
     ### How was this patch tested?
   
     - `build/sbt core/Test/compile` — the module compiles standalone, and the 
writer and reader PRs each compile independently on top of this one (confirming 
the split is correct).
     - `build/sbt "common-utils/testOnly org.apache.spark.util.LogKeysSuite"` — 
verifies the new log keys keep `LogKeys.java` sorted.
     - The shared components are exercised end-to-end by 
`StreamingShuffleSuite` in the tests PR of this stack (writer<->reader data 
transfer, termination handshake, checksum verification, and
     background-thread error propagation).
   
     ### Was this patch authored or co-authored using generative AI tooling?
   
    Co-authored with Claude Code
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to