cloud-fan commented on PR #56055:
URL: https://github.com/apache/spark/pull/56055#issuecomment-4786331791

   Joining late — I've read the thread and I'm with @mridulm on direction: this 
should be a principled construct *inside* `DAGScheduler`, not a separate 
scheduler picked by config (pluggable `DAGScheduler` is its own design problem, 
not a derisking mechanism for one feature). But I think @jerrypeng's latest 
sketch 
([comment](https://github.com/apache/spark/pull/56055#issuecomment-4664810708)) 
is most of the way there, and this PR is a useful prototype for nailing down 
the semantics. Let me try to converge them.
   
   **The new ability is cross-stage gang scheduling, and that already implies a 
streaming shuffle.** Co-scheduling two data-dependent stages is only useful if 
the edge is readable before the producer finishes — i.e. a streaming shuffle. 
So "run these stages concurrently" and "the shuffle is incremental" aren't 
orthogonal; they're one decision seen from two sides. The 
`incrementalHint`/`persistentHint` split models a four-state space where only 
one cell is real today (incremental + transient = the streaming shuffle), and 
`persistentHint` has no implementation behind it — I'd drop it until a 
persistent-incremental shuffle (e.g. Kafka-backed) actually needs it.
   
   One nuance on "barrier": what's barrier-like is the *resource* side — the 
co-scheduled stages must all get slots at once or fail fast, which is exactly 
what this PR's `CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT` reimplements on top of 
the existing `checkBarrierStageWithNumSlots`. What's *not* barrier-like is 
execution — backpressure handles readiness, no sync point. So it's 
**gang-scheduled stages + pipelined execution**. (I'd avoid the word "barrier" 
in the API since it already means intra-stage MPI scheduling, but the lineage — 
generalizing barrier's gang-resource model from tasks to a stage set — is worth 
stating.)
   
   **Concretely, I'd express @mridulm's (a)–(d) as one first-class dependency 
kind** rather than two hint flags: a *pipelined shuffle dependency*, peer to 
narrow/shuffle deps, set in physical planning and carried into the 
`ShuffleDependency`.
   
   *Add:* the marker on `ShuffleDependency`; treat a pipelined edge as 
non-sequencing in stage creation so the connected component forms one 
co-scheduled group (a); submit the group together through a generalized barrier 
slot check (b); a group-failure policy where any failure reruns the group (c); 
fail-fast where a regular shuffle dependency crosses the group boundary (d).
   
   *Remove:* `ConcurrentStageDAGScheduler`, `spark.scheduler.dagSchedulerType`, 
and the per-job `streaming.concurrent.stages.enabled` property (replaced by the 
marker); `CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT` + 
`realtimeModeSlotsCheck.disabled` (subsumed by the slot check); and re-key the 
`TaskSchedulerImpl`/`TaskSetManager` failure changes on group membership rather 
than the streaming property. RTM then becomes a pure consumer — a 
physical-planning rule marks its exchanges pipelined, no streaming-specific 
code in the scheduler.
   
   **On out-of-order completion:** with stages concurrent, the result stage's 
tasks can finish while parents run, and the base `Success` path would declare 
the job done and cancel the still-running parents. The current PR buffers a 
stage's completion events until its parents finish, then replays them. I'd 
**keep that approach** — the all-or-nothing group-failure model means we never 
need partial-progress correctness inside a group (the only thing a native 
rewrite would buy), and the buffer keeps the batch-critical completion paths 
untouched while naturally holding a finished-but-still-pipelining stage in 
`runningStages`. Just make the **group** the first-class concept — an explicit 
group-completion contract implemented *via* the deferral — so it's a documented 
mechanism, not a free-floating shim. Worth a test for the replay window (a 
failure between parent-finish and replay → group reruns, buffer dropped).
   
   **Sequencing:** this still gets us there incrementally, just cut differently 
— land the dependency type + group scheduling + group failure as the generic 
milestone with a *non-streaming* test, then add the RTM rule + streaming 
shuffle on top. The scheduler primitive is the hard-to-revise part, so I'd 
rather get its shape right in-tree first with RTM as the validating consumer 
than merge the subclass and re-cut it later. Happy to help review the 
dependency-type design.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to