jerrypeng opened a new pull request, #56055:
URL: https://github.com/apache/spark/pull/56055
### Why are the changes needed?
Spark's default `DAGScheduler` submits a job's stages **sequentially** — a
downstream stage only starts after every upstream stage has finished writing
its shuffle output. For a
Structured Streaming query like `Source → Stateful Op → Sink`, that means
each micro-batch pays serialization latency proportional to the number of
stages, plus the cost of
materializing shuffle output between every pair of stages.
This PR adds an opt-in `ConcurrentStageDAGScheduler` that submits all
stages of a streaming job at the same time and lets a streaming shuffle pipe
records between them as they run.
End-to-end latency becomes bounded by the slowest stage rather than the
sum of stages, which is the foundation for Structured Streaming's real-time
mode.
The scheduler is strictly opt-in:
- Cluster level:
`spark.scheduler.dagSchedulerType=ConcurrentStageDAGScheduler`. When unset (the
default), `SparkContext` instantiates the regular `DAGScheduler` and there is
zero
behavioral difference.
- Job level: the streaming engine sets
`streaming.concurrent.stages.enabled=true` on the job's `Properties`. Jobs
without this property go through the unchanged code paths even when the
concurrent scheduler is active.
It also adds the smallest possible extension points to `DAGScheduler` (one
empty hook, two thin helpers, four `protected` modifiers, two package-private
accessors). All of those are
no-ops for the default scheduler, so the change is invisible to existing
users.
### Does this PR introduce _any_ user-facing change?
**No user-facing behavior change for any existing workload.** Without
setting the new config, `SparkContext` builds the same `DAGScheduler` it always
has, and the default scheduler's
behavior is unchanged.
The PR does introduce two new internal configs (both `internal()`, so not
part of the public surface):
- `spark.scheduler.dagSchedulerType` — chooses the `DAGScheduler`
implementation. Defaults to `"DAGScheduler"`.
- `spark.scheduler.realtimeModeSlotsCheck.disabled` — skips the
slot-availability check used by the concurrent scheduler. Defaults to `false`.
And one new error class:
- `CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT` — thrown by the concurrent
scheduler when a streaming job needs more concurrent slots than the cluster
offers.
### How was this patch tested?
Added one new test suite plus three targeted regression tests:
1. **`ConcurrentStageDAGSchedulerSuite`** — exercises the new scheduler
end-to-end through the existing `DAGSchedulerSuiteBase` test harness:
- Simple two-stage concurrent job: both stages enter `runningStages` on
submission; the child stage's task completions are buffered until the parent
finishes.
- Concurrent stages disabled in properties: scheduler falls back to
default sequential behavior.
- Complex six-stage DAG with diamond dependencies: verifies
parent-tracking, deferred-event buffering, and the correct release order when
parents finish out of order.
- Speculation rejected: a job with concurrent stages and
`spark.speculation=true` fails on submission with a clear error.
By inheriting from `DAGSchedulerSuiteBase`, the suite also runs all 149
existing `DAGScheduler` tests against `ConcurrentStageDAGScheduler` — free
regression coverage that the new
scheduler behaves identically to `DAGScheduler` when concurrent mode is
not engaged. Total: 153 tests pass.
2. **`TaskSchedulerImplSuite`** — one new test: a `TaskSet` with
`streaming.concurrent.stages.enabled=true` is submitted with
`maxTaskFailures=1` regardless of `spark.task.maxFailures`;
a regular `TaskSet` still gets the cluster default. Regression-guards
both branches of the new conditional.
3. **`TaskSetManagerSuite`** — two new tests covering the new
failure-counting behavior:
- With concurrent stages enabled, an `ExecutorLostFailure` with
`exitCausedByApp=false` counts toward `maxTaskFailures` (the query restarts
rather than silently absorbing executor
loss).
- Without concurrent stages, the same failure does **not** count —
regression guard for the default behavior.
Full run: `core/testOnly *DAGSchedulerSuite
*ConcurrentStageDAGSchedulerSuite *TaskSetManagerSuite *TaskSchedulerImplSuite`
→ **484 tests, all pass.**
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]