jerrypeng commented on PR #56055:
URL: https://github.com/apache/spark/pull/56055#issuecomment-4616968863
@mridulm thank you for clarifying! Some background may help.
Real-time mode is a new execution mode we're introducing in Structured
Streaming that lets streaming queries process data with end-to-end latencies in
the milliseconds. Reaching that requires a few changes to how queries execute;
scheduling is one of them, and that's what this PR covers. For a query with
multiple stages to hit millisecond latencies, the cluster has to run the tasks
of all stages at the same time, with adjacent stages connected by a streaming
shuffle (implemented in a separate set of PRs). That lets data flow
continuously through the query DAG instead of one stage at a time — and
processing one stage at a time is a core reason the current model can't reach
these latencies.
On the changes in this PR: the change to the existing DAG scheduler is small
and additive — a no-op hook, a couple of accessors, and a few visibility
relaxations — and the default behavior is unchanged. The new capability lives
in a separate class (`ConcurrentStageDAGScheduler`), so the real-time-mode
logic is isolated from the shared scheduler.
On the semantics you asked about: the new capability is that the stages of a
DAG can run concurrently rather than one at a time. Today the DAG scheduler
treats a data dependency between two stages as a reason to run them
sequentially — the child waits for the parent. But that sequencing isn't
inherent to the dependency; it's a consequence of the shuffle being
materialized, where the consumer can't read anything until the producer has
written its complete output. With a streaming shuffle that the consumer reads
incrementally, the producer and consumer stages can run at the same time. So
the semantic this introduces is: a directional data dependency constrains the
ordering of data, not necessarily the concurrency of execution — and when the
connecting shuffle supports incremental reads, dependent stages may execute
concurrently. I hope that clarifies it.
This is a general scheduling capability — concurrent execution of
data-dependent stages over an incrementally-readable shuffle — and real-time
mode is simply its first consumer.
I think the question next should be how to natively integrate this into the
DAGScheduler so users don't need to specify to configure to use the
ConcurrentDAGscheduler to get this capability. That is what I will be working
on next.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]