jerrypeng opened a new pull request, #56055:
URL: https://github.com/apache/spark/pull/56055

    ### Why are the changes needed?
     
     Spark's default `DAGScheduler` submits a job's stages **sequentially** — a 
downstream stage only starts after every upstream stage has finished writing 
its shuffle output. For a
     Structured Streaming query like `Source → Stateful Op → Sink`, that means 
each micro-batch pays serialization latency proportional to the number of 
stages, plus the cost of
     materializing shuffle output between every pair of stages.
     
     This PR adds an opt-in `ConcurrentStageDAGScheduler` that submits all 
stages of a streaming job at the same time and lets a streaming shuffle pipe 
records between them as they run.
     End-to-end latency becomes bounded by the slowest stage rather than the 
sum of stages, which is the foundation for Structured Streaming's real-time 
mode.
   
     The scheduler is strictly opt-in:
   
     - Cluster level: 
`spark.scheduler.dagSchedulerType=ConcurrentStageDAGScheduler`. When unset (the 
default), `SparkContext` instantiates the regular `DAGScheduler` and there is 
zero
     behavioral difference.
     - Job level: the streaming engine sets 
`streaming.concurrent.stages.enabled=true` on the job's `Properties`. Jobs 
without this property go through the unchanged code paths even when the
      concurrent scheduler is active.
   
     It also adds the smallest possible extension points to `DAGScheduler` (one 
empty hook, two thin helpers, four `protected` modifiers, two package-private 
accessors). All of those are
     no-ops for the default scheduler, so the change is invisible to existing 
users.
   
     ### Does this PR introduce _any_ user-facing change?
   
     **No user-facing behavior change for any existing workload.** Without 
setting the new config, `SparkContext` builds the same `DAGScheduler` it always 
has, and the default scheduler's
     behavior is unchanged.
   
     The PR does introduce two new internal configs (both `internal()`, so not 
part of the public surface):
   
     - `spark.scheduler.dagSchedulerType` — chooses the `DAGScheduler` 
implementation. Defaults to `"DAGScheduler"`.
     - `spark.scheduler.realtimeModeSlotsCheck.disabled` — skips the 
slot-availability check used by the concurrent scheduler. Defaults to `false`.
   
     And one new error class:
   
     - `CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT` — thrown by the concurrent 
scheduler when a streaming job needs more concurrent slots than the cluster 
offers.
   
     ### How was this patch tested?
     
     Added one new test suite plus three targeted regression tests:
   
     1. **`ConcurrentStageDAGSchedulerSuite`** — exercises the new scheduler 
end-to-end through the existing `DAGSchedulerSuiteBase` test harness:
        - Simple two-stage concurrent job: both stages enter `runningStages` on 
submission; the child stage's task completions are buffered until the parent 
finishes.
        - Concurrent stages disabled in properties: scheduler falls back to 
default sequential behavior.
        - Complex six-stage DAG with diamond dependencies: verifies 
parent-tracking, deferred-event buffering, and the correct release order when 
parents finish out of order.
        - Speculation rejected: a job with concurrent stages and 
`spark.speculation=true` fails on submission with a clear error.
   
        By inheriting from `DAGSchedulerSuiteBase`, the suite also runs all 149 
existing `DAGScheduler` tests against `ConcurrentStageDAGScheduler` — free 
regression coverage that the new
     scheduler behaves identically to `DAGScheduler` when concurrent mode is 
not engaged. Total: 153 tests pass.
   
     2. **`TaskSchedulerImplSuite`** — one new test: a `TaskSet` with 
`streaming.concurrent.stages.enabled=true` is submitted with 
`maxTaskFailures=1` regardless of `spark.task.maxFailures`;
      a regular `TaskSet` still gets the cluster default. Regression-guards 
both branches of the new conditional.
   
     3. **`TaskSetManagerSuite`** — two new tests covering the new 
failure-counting behavior:
        - With concurrent stages enabled, an `ExecutorLostFailure` with 
`exitCausedByApp=false` counts toward `maxTaskFailures` (the query restarts 
rather than silently absorbing executor
     loss).
        - Without concurrent stages, the same failure does **not** count — 
regression guard for the default behavior.
   
     Full run: `core/testOnly *DAGSchedulerSuite 
*ConcurrentStageDAGSchedulerSuite *TaskSetManagerSuite *TaskSchedulerImplSuite` 
→ **484 tests, all pass.**
   
     ### Was this patch authored or co-authored using generative AI tooling?
   
     Generated-by: Claude Code (Claude Opus 4.7)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to