HeartSaVioR opened a new pull request, #48676:
URL: https://github.com/apache/spark/pull/48676

   ### What changes were proposed in this pull request?
   
   This PR proposes to make the majority of DSv1 streaming sources to carry 
over stream information from logical plan to physical plan, so that Spark can 
use the same approach of collecting metrics for DSv2 streaming sources.
   
   To achieve this, this PR introduces two new traits which are matching with 
logical node and physical node respectively:
   
   * Logical plan: StreamSourceAwareLogicalPlan
   * Physical plan: StreamSourceAwareSparkPlan
   
   All streaming DSv1 sources are expected to produce the logical plan in 
getBatch(), which has leaf node(s) implementing StreamSourceAwareLogicalPlan. 
For built-in DSv1 streaming sources (and external sources using 
`SparkSession/SQLContext.internalCreateDataFrame`), they are mostly using one 
or multiple of nodes:
   
   * LogicalRDD
   * LocalRelation
   * LogicalRelation
   
   Physical planning with LogicalRelation will be covered via either 1) 
RowDataSourceScanExec or 2) FileSourceScanExec. It may not cover all possible 
types of relations, but it's uneasy for both data source developers and users 
to extend the Spark planner to handle the additional case in physical planning. 
Furthermore, DSv2 is the standard interface for streaming sources for external 
sources. For others they are covered with static nodes, LogicalRDD -> 
RDDScanExec, LocalRelation -> LocalTableScanExec.
   
   This PR updates the progress reporter to collect the metrics from the nodes 
implementing StreamSourceAwareSparkPlan, which can be used across DSv1 and 
DSv2. To avoid regression, the progress reporter will check whether the 
executed plan has all streams we want to capture; if the executed plan misses 
some stream(s), the progress reporter will fall back to the old way.
   
   ### Why are the changes needed?
   
   For DSv2 data sources, the source nodes in the executed plan are always 
MicroBatchScanExec, and these nodes contain the stream information.
   
   But for DSv1 data sources, the logical node and the physical node 
representing the scan of the source are technically arbitrary (any logical node 
and any physical node), hence Spark makes an assumption that the leaf nodes for 
initial logical plan <=> logical plan for batch N <=> physical plan for batch N 
are the same so that we can associate these nodes. This is fragile and we have 
non-trivial number of reports of broken metric.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Modified tests - some tests were based on limitation and these tests are 
fixed with this PR.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to