cloud-fan commented on PR #56055: URL: https://github.com/apache/spark/pull/56055#issuecomment-4786331791
Joining late — I've read the thread and I'm with @mridulm on direction: this should be a principled construct *inside* `DAGScheduler`, not a separate scheduler picked by config (pluggable `DAGScheduler` is its own design problem, not a derisking mechanism for one feature). But I think @jerrypeng's latest sketch ([comment](https://github.com/apache/spark/pull/56055#issuecomment-4664810708)) is most of the way there, and this PR is a useful prototype for nailing down the semantics. Let me try to converge them. **The new ability is cross-stage gang scheduling, and that already implies a streaming shuffle.** Co-scheduling two data-dependent stages is only useful if the edge is readable before the producer finishes — i.e. a streaming shuffle. So "run these stages concurrently" and "the shuffle is incremental" aren't orthogonal; they're one decision seen from two sides. The `incrementalHint`/`persistentHint` split models a four-state space where only one cell is real today (incremental + transient = the streaming shuffle), and `persistentHint` has no implementation behind it — I'd drop it until a persistent-incremental shuffle (e.g. Kafka-backed) actually needs it. One nuance on "barrier": what's barrier-like is the *resource* side — the co-scheduled stages must all get slots at once or fail fast, which is exactly what this PR's `CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT` reimplements on top of the existing `checkBarrierStageWithNumSlots`. What's *not* barrier-like is execution — backpressure handles readiness, no sync point. So it's **gang-scheduled stages + pipelined execution**. (I'd avoid the word "barrier" in the API since it already means intra-stage MPI scheduling, but the lineage — generalizing barrier's gang-resource model from tasks to a stage set — is worth stating.) **Concretely, I'd express @mridulm's (a)–(d) as one first-class dependency kind** rather than two hint flags: a *pipelined shuffle dependency*, peer to narrow/shuffle deps, set in physical planning and carried into the `ShuffleDependency`. *Add:* the marker on `ShuffleDependency`; treat a pipelined edge as non-sequencing in stage creation so the connected component forms one co-scheduled group (a); submit the group together through a generalized barrier slot check (b); a group-failure policy where any failure reruns the group (c); fail-fast where a regular shuffle dependency crosses the group boundary (d). *Remove:* `ConcurrentStageDAGScheduler`, `spark.scheduler.dagSchedulerType`, and the per-job `streaming.concurrent.stages.enabled` property (replaced by the marker); `CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT` + `realtimeModeSlotsCheck.disabled` (subsumed by the slot check); and re-key the `TaskSchedulerImpl`/`TaskSetManager` failure changes on group membership rather than the streaming property. RTM then becomes a pure consumer — a physical-planning rule marks its exchanges pipelined, no streaming-specific code in the scheduler. **On out-of-order completion:** with stages concurrent, the result stage's tasks can finish while parents run, and the base `Success` path would declare the job done and cancel the still-running parents. The current PR buffers a stage's completion events until its parents finish, then replays them. I'd **keep that approach** — the all-or-nothing group-failure model means we never need partial-progress correctness inside a group (the only thing a native rewrite would buy), and the buffer keeps the batch-critical completion paths untouched while naturally holding a finished-but-still-pipelining stage in `runningStages`. Just make the **group** the first-class concept — an explicit group-completion contract implemented *via* the deferral — so it's a documented mechanism, not a free-floating shim. Worth a test for the replay window (a failure between parent-finish and replay → group reruns, buffer dropped). **Sequencing:** this still gets us there incrementally, just cut differently — land the dependency type + group scheduling + group failure as the generic milestone with a *non-streaming* test, then add the RTM rule + streaming shuffle on top. The scheduler primitive is the hard-to-revise part, so I'd rather get its shape right in-tree first with RTM as the validating consumer than merge the subclass and re-cut it later. Happy to help review the dependency-type design. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
