HeartSaVioR opened a new pull request, #38717:
URL: https://github.com/apache/spark/pull/38717
### What changes were proposed in this pull request?
This PR proposes to fix the broken metrics when the streaming query has CTE,
via applying InlineCTE manually against analyzed plan when collecting metrics.
Suppose a streaming query contains below part as batch side which is joined
with streaming source:
```
with batch_tbl as (
SELECT col1, col2 FROM parquet_tbl
)
SELECT col1 AS key, col2 as value_batch FROM batch_tbl
```
Currently, Spark adds WithCTE node with CTERelationDef and CTERelationRef
when there is a usage of CTE. Below is an analyzed plan:
```
WriteToMicroBatchDataSource MemorySink,
2cbb2afa-6513-4a23-b4c2-37910fc9cdf9, Append, 0
+- Project [key#15, value_stream#16, value_batch#9L]
+- Join Inner, (cast(key#15 as bigint) = key#8L)
:- SubqueryAlias spark_catalog.default.parquet_streaming_tbl
: +- Project [key#55 AS key#15, value_stream#56 AS value_stream#16]
: +- Relation
spark_catalog.default.parquet_streaming_tbl[key#55,value_stream#56] parquet
+- WithCTE
:- CTERelationDef 0, false
: +- SubqueryAlias batch_tbl
: +- Project [col1#10L, col2#11L]
: +- SubqueryAlias spark_catalog.default.parquet_tbl
: +- Relation
spark_catalog.default.parquet_tbl[col1#10L,col2#11L] parquet
+- Project [col1#10L AS key#8L, col2#11L AS value_batch#9L]
+- SubqueryAlias batch_tbl
+- CTERelationRef 0, true, [col1#10L, col2#11L]
```
Here, there are 3 leaf nodes in the plan, but the actual sources in the leaf
nodes are 2. During the optimization, inlining CTE happens and there are 2 leaf
nodes. Below is the optimized plan:
```
WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer:
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@622c7c7f]
+- Project [key#55, value_stream#56, value_batch#9L]
+- Join Inner, (cast(key#55 as bigint) = key#8L)
:- Filter isnotnull(key#55)
: +- Relation
spark_catalog.default.parquet_streaming_tbl[key#55,value_stream#56] parquet
+- Project [col1#10L AS key#8L, col2#11L AS value_batch#9L]
+- Filter isnotnull(col1#10L)
+- Relation spark_catalog.default.parquet_tbl[col1#10L,col2#11L]
parquet
```
Hence executed plan will also have 2 leaf nodes, which does not match with
the number of leaf nodes in analyzed plan, and ProgressReporter will give up
collecting metrics.
Applying InlineCTE against analyzed plan during collecting metrics would
resolve this. For example, below is the logical plan which applies InlineCTE
against above analyzed plan.
```
WriteToMicroBatchDataSource MemorySink,
2cbb2afa-6513-4a23-b4c2-37910fc9cdf9, Append, 0
+- Project [key#15, value_stream#16, value_batch#9L]
+- Join Inner, (cast(key#15 as bigint) = key#8L)
:- SubqueryAlias spark_catalog.default.parquet_streaming_tbl
: +- Project [key#55 AS key#15, value_stream#56 AS value_stream#16]
: +- Relation
spark_catalog.default.parquet_streaming_tbl[key#55,value_stream#56] parquet
+- Project [col1#10L AS key#8L, col2#11L AS value_batch#9L]
+- SubqueryAlias batch_tbl
+- SubqueryAlias batch_tbl
+- Project [col1#10L, col2#11L]
+- SubqueryAlias spark_catalog.default.parquet_tbl
+- Relation
spark_catalog.default.parquet_tbl[col1#10L,col2#11L] parquet
```
Note that this is only required for the case where there is at least one of
DSv1 streaming source in the streaming query. If streaming query only contains
DSv2 data sources as streaming sources, ProgressReporter can just pick up
dedicated physical node(s) from executed plan.
### Why are the changes needed?
The metrics in streaming query are broken if the query contains CTE.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New test case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]