HeartSaVioR opened a new pull request, #38717:
URL: https://github.com/apache/spark/pull/38717

   ### What changes were proposed in this pull request?
   
   This PR proposes to fix the broken metrics when the streaming query has CTE, 
via applying InlineCTE manually against analyzed plan when collecting metrics.
   
   Suppose a streaming query contains below part as batch side which is joined 
with streaming source:
   
   ```
   with batch_tbl as (
     SELECT col1, col2 FROM parquet_tbl
   )
   
   SELECT col1 AS key, col2 as value_batch FROM batch_tbl
   ```
   
   Currently, Spark adds WithCTE node with CTERelationDef and CTERelationRef 
when there is a usage of CTE. Below is an analyzed plan:
   
   ```
   WriteToMicroBatchDataSource MemorySink, 
2cbb2afa-6513-4a23-b4c2-37910fc9cdf9, Append, 0
   +- Project [key#15, value_stream#16, value_batch#9L]
      +- Join Inner, (cast(key#15 as bigint) = key#8L)
         :- SubqueryAlias spark_catalog.default.parquet_streaming_tbl
         :  +- Project [key#55 AS key#15, value_stream#56 AS value_stream#16]
         :     +- Relation 
spark_catalog.default.parquet_streaming_tbl[key#55,value_stream#56] parquet
         +- WithCTE
            :- CTERelationDef 0, false
            :  +- SubqueryAlias batch_tbl
            :     +- Project [col1#10L, col2#11L]
            :        +- SubqueryAlias spark_catalog.default.parquet_tbl
            :           +- Relation 
spark_catalog.default.parquet_tbl[col1#10L,col2#11L] parquet
            +- Project [col1#10L AS key#8L, col2#11L AS value_batch#9L]
               +- SubqueryAlias batch_tbl
                  +- CTERelationRef 0, true, [col1#10L, col2#11L]
   ```
   
   Here, there are 3 leaf nodes in the plan, but the actual sources in the leaf 
nodes are 2. During the optimization, inlining CTE happens and there are 2 leaf 
nodes. Below is the optimized plan:
   
   ```
   WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@622c7c7f]
   +- Project [key#55, value_stream#56, value_batch#9L]
      +- Join Inner, (cast(key#55 as bigint) = key#8L)
         :- Filter isnotnull(key#55)
         :  +- Relation 
spark_catalog.default.parquet_streaming_tbl[key#55,value_stream#56] parquet
         +- Project [col1#10L AS key#8L, col2#11L AS value_batch#9L]
            +- Filter isnotnull(col1#10L)
               +- Relation spark_catalog.default.parquet_tbl[col1#10L,col2#11L] 
parquet
   ```
   
   Hence executed plan will also have 2 leaf nodes, which does not match with 
the number of leaf nodes in analyzed plan, and ProgressReporter will give up 
collecting metrics.
   
   Applying InlineCTE against analyzed plan during collecting metrics would 
resolve this. For example, below is the logical plan which applies InlineCTE 
against above analyzed plan.
   
   ```
   WriteToMicroBatchDataSource MemorySink, 
2cbb2afa-6513-4a23-b4c2-37910fc9cdf9, Append, 0
   +- Project [key#15, value_stream#16, value_batch#9L]
      +- Join Inner, (cast(key#15 as bigint) = key#8L)
         :- SubqueryAlias spark_catalog.default.parquet_streaming_tbl
         :  +- Project [key#55 AS key#15, value_stream#56 AS value_stream#16]
         :     +- Relation 
spark_catalog.default.parquet_streaming_tbl[key#55,value_stream#56] parquet
         +- Project [col1#10L AS key#8L, col2#11L AS value_batch#9L]
            +- SubqueryAlias batch_tbl
               +- SubqueryAlias batch_tbl
                  +- Project [col1#10L, col2#11L]
                     +- SubqueryAlias spark_catalog.default.parquet_tbl
                        +- Relation 
spark_catalog.default.parquet_tbl[col1#10L,col2#11L] parquet
   ```
   
   Note that this is only required for the case where there is at least one of 
DSv1 streaming source in the streaming query. If streaming query only contains 
DSv2 data sources as streaming sources, ProgressReporter can just pick up 
dedicated physical node(s) from executed plan.
   
   ### Why are the changes needed?
   
   The metrics in streaming query are broken if the query contains CTE. 
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   New test case.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to