jerrypeng commented on PR #56055:
URL: https://github.com/apache/spark/pull/56055#issuecomment-4616968863

   @mridulm thank you for clarifying! Some background may help.
     
   Real-time mode is a new execution mode we're introducing in Structured 
Streaming that lets streaming queries process data with end-to-end latencies in 
the milliseconds. Reaching that requires a few changes to how queries execute; 
scheduling is one of them, and that's what this PR covers. For a query with 
multiple stages to hit millisecond latencies, the cluster has to run the tasks 
of all stages at the same time, with adjacent stages connected by a streaming 
shuffle (implemented in a separate set of PRs). That lets data flow 
continuously through the query DAG instead of one stage at a time — and 
processing one stage at a time is a core reason the current model can't reach 
these latencies.
   
   On the changes in this PR: the change to the existing DAG scheduler is small 
and additive — a no-op hook, a couple of accessors, and a few visibility 
relaxations — and the default behavior is unchanged. The new capability lives 
in a separate class (`ConcurrentStageDAGScheduler`), so the real-time-mode 
logic is isolated from the shared scheduler.
   
   On the semantics you asked about: the new capability is that the stages of a 
DAG can run concurrently rather than one at a time. Today the DAG scheduler 
treats a data dependency between two stages as a reason to run them 
sequentially — the child waits for the parent. But that sequencing isn't 
inherent to the dependency; it's a consequence of the shuffle being 
materialized, where the consumer can't read anything until the producer has 
written its complete output. With a streaming shuffle that the consumer reads 
incrementally, the producer and consumer stages can run at the same time. So 
the semantic this introduces is: a directional data dependency constrains the 
ordering of data, not necessarily the concurrency of execution — and when the 
connecting shuffle supports incremental reads, dependent stages may execute 
concurrently. I hope that clarifies it.
   
   This is a general scheduling capability — concurrent execution of 
data-dependent stages over an incrementally-readable shuffle — and real-time 
mode is simply its first consumer.
   
   I think the question next should be how to natively integrate this into the 
DAGScheduler so users don't need to specify to configure to use the 
ConcurrentDAGscheduler to get this capability.  That is what I will be working 
on next.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to