HeartSaVioR opened a new pull request, #48676: URL: https://github.com/apache/spark/pull/48676
### What changes were proposed in this pull request? This PR proposes to make the majority of DSv1 streaming sources to carry over stream information from logical plan to physical plan, so that Spark can use the same approach of collecting metrics for DSv2 streaming sources. To achieve this, this PR introduces two new traits which are matching with logical node and physical node respectively: * Logical plan: StreamSourceAwareLogicalPlan * Physical plan: StreamSourceAwareSparkPlan All streaming DSv1 sources are expected to produce the logical plan in getBatch(), which has leaf node(s) implementing StreamSourceAwareLogicalPlan. For built-in DSv1 streaming sources (and external sources using `SparkSession/SQLContext.internalCreateDataFrame`), they are mostly using one or multiple of nodes: * LogicalRDD * LocalRelation * LogicalRelation Physical planning with LogicalRelation will be covered via either 1) RowDataSourceScanExec or 2) FileSourceScanExec. It may not cover all possible types of relations, but it's uneasy for both data source developers and users to extend the Spark planner to handle the additional case in physical planning. Furthermore, DSv2 is the standard interface for streaming sources for external sources. For others they are covered with static nodes, LogicalRDD -> RDDScanExec, LocalRelation -> LocalTableScanExec. This PR updates the progress reporter to collect the metrics from the nodes implementing StreamSourceAwareSparkPlan, which can be used across DSv1 and DSv2. To avoid regression, the progress reporter will check whether the executed plan has all streams we want to capture; if the executed plan misses some stream(s), the progress reporter will fall back to the old way. ### Why are the changes needed? For DSv2 data sources, the source nodes in the executed plan are always MicroBatchScanExec, and these nodes contain the stream information. But for DSv1 data sources, the logical node and the physical node representing the scan of the source are technically arbitrary (any logical node and any physical node), hence Spark makes an assumption that the leaf nodes for initial logical plan <=> logical plan for batch N <=> physical plan for batch N are the same so that we can associate these nodes. This is fragile and we have non-trivial number of reports of broken metric. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Modified tests - some tests were based on limitation and these tests are fixed with this PR. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
