jerrypeng commented on PR #56055:
URL: https://github.com/apache/spark/pull/56055#issuecomment-4664810708

   @mridulm To give a sense of what I'm picturing for the more native design:
   Entry point — a capability flag on the shuffle. Add a field to 
ShuffleExchangeExec:
   
   ```
     // true if this shuffle can be read incrementally
     // (the consumer can read from a still-running producer)
     incrementalHint: Boolean
   ```
   
   It's set during physical planning (this is an execution concern, not a 
logical-plan one), flows into the ShuffleDependency the exchange creates, and 
is read by the DAGScheduler at stage-creation time. That single flag is the 
entry point that opts a shuffle edge into concurrent scheduling over an 
incremental shuffle (e.g. the "streaming shuffle" we're building for RTM).
     
   How RTM sets it. A physical-planning rule for RTM streaming queries marks 
the query's shuffle exchanges with incrementalHint = true. Nothing here is 
streaming-specific: any feature can write its own physical-planning rule to opt 
a job into concurrent scheduling + incremental shuffles, so the capability is 
generic — RTM is just the first caller.
   
   Semantics implied by an incremental shuffle:
   
   1. The stages on either side of the shuffle can run concurrently — the 
consumer reads from a still-running producer instead of waiting for fully 
materialized output.
   
   2. Because the incremental shuffle's data is transient (it can't be 
replayed), a task failure in either of the co-scheduled stages reruns them (for 
RTM, the query restarts from checkpoint).
   
   A cleaner generalization — separate "incremental" from "persistent". 
Semantic (2) is really a consequence of a second, orthogonal property: whether 
the shuffle data is durable/replayable. We can make that explicit with a second 
flag:
   
   ```
     // true if the shuffle data is durable/replayable, so a single task
     // failure does not require rerunning the producer stage
     persistentHint: Boolean   // name TBD
   ```
   
   That decouples two concerns:
   
   - incrementally-readable → enables concurrent scheduling.
   - persistent/replayable → determines failure recovery: a transient shuffle 
means rerun the co-scheduled stages; a durable shuffle allows normal 
fine-grained recovery (re-read from the persisted data / offset).
   
   Though I would defer implementing the "persistentHint" capability until 
there is an actual use case / implementation. 
   
   The streaming shuffle is incremental and transient. But a Kafka-backed 
shuffle, say, could be incremental and persistent — concurrent scheduling 
without the rerun-everything-on-failure penalty, since the consumer can replay 
from an offset. Splitting the two flags lets the construct compose across those 
cases instead of hardcoding RTM's failure model.
   
   Pluggable shuffle implementation via config. The engine maps the capability 
to a concrete ShuffleManager. Today we dispatch between sort and streaming 
managers on a per-job property; the cleaner version selects per-dependency from 
incrementalHint, with the implementations configured:
   
   ```
     spark.shuffle.manager             = 
org.apache.spark.shuffle.sort.SortShuffleManager
     spark.shuffle.manager.incremental = 
org.apache.spark.shuffle.streaming.StreamingShuffleManager
   ```
   
   A shuffle with incrementalHint = true is served by the configured 
incremental manager; everything else by the default — keeping the scheduler 
construct generic while the shuffle implementation stays pluggable.
   
   Let me know what you think though I would still prefer to do this 
incrementally like my previous comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to