jerrypeng commented on PR #56055:
URL: https://github.com/apache/spark/pull/56055#issuecomment-4664810708
@mridulm To give a sense of what I'm picturing for the more native design:
Entry point — a capability flag on the shuffle. Add a field to
ShuffleExchangeExec:
```
// true if this shuffle can be read incrementally
// (the consumer can read from a still-running producer)
incrementalHint: Boolean
```
It's set during physical planning (this is an execution concern, not a
logical-plan one), flows into the ShuffleDependency the exchange creates, and
is read by the DAGScheduler at stage-creation time. That single flag is the
entry point that opts a shuffle edge into concurrent scheduling over an
incremental shuffle (e.g. the "streaming shuffle" we're building for RTM).
How RTM sets it. A physical-planning rule for RTM streaming queries marks
the query's shuffle exchanges with incrementalHint = true. Nothing here is
streaming-specific: any feature can write its own physical-planning rule to opt
a job into concurrent scheduling + incremental shuffles, so the capability is
generic — RTM is just the first caller.
Semantics implied by an incremental shuffle:
1. The stages on either side of the shuffle can run concurrently — the
consumer reads from a still-running producer instead of waiting for fully
materialized output.
2. Because the incremental shuffle's data is transient (it can't be
replayed), a task failure in either of the co-scheduled stages reruns them (for
RTM, the query restarts from checkpoint).
A cleaner generalization — separate "incremental" from "persistent".
Semantic (2) is really a consequence of a second, orthogonal property: whether
the shuffle data is durable/replayable. We can make that explicit with a second
flag:
```
// true if the shuffle data is durable/replayable, so a single task
// failure does not require rerunning the producer stage
persistentHint: Boolean // name TBD
```
That decouples two concerns:
- incrementally-readable → enables concurrent scheduling.
- persistent/replayable → determines failure recovery: a transient shuffle
means rerun the co-scheduled stages; a durable shuffle allows normal
fine-grained recovery (re-read from the persisted data / offset).
Though I would defer implementing the "persistentHint" capability until
there is an actual use case / implementation.
The streaming shuffle is incremental and transient. But a Kafka-backed
shuffle, say, could be incremental and persistent — concurrent scheduling
without the rerun-everything-on-failure penalty, since the consumer can replay
from an offset. Splitting the two flags lets the construct compose across those
cases instead of hardcoding RTM's failure model.
Pluggable shuffle implementation via config. The engine maps the capability
to a concrete ShuffleManager. Today we dispatch between sort and streaming
managers on a per-job property; the cleaner version selects per-dependency from
incrementalHint, with the implementations configured:
```
spark.shuffle.manager =
org.apache.spark.shuffle.sort.SortShuffleManager
spark.shuffle.manager.incremental =
org.apache.spark.shuffle.streaming.StreamingShuffleManager
```
A shuffle with incrementalHint = true is served by the configured
incremental manager; everything else by the default — keeping the scheduler
construct generic while the shuffle implementation stays pluggable.
Let me know what you think though I would still prefer to do this
incrementally like my previous comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]