AngersZhuuuu commented on code in PR #53526:
URL: https://github.com/apache/spark/pull/53526#discussion_r2637619931


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala:
##########
@@ -20,15 +20,15 @@ package org.apache.spark.sql.catalyst.normalizer
 import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect, 
CTERelationRef, LogicalPlan, UnionLoop, UnionLoopRef, WithCTE}
 import org.apache.spark.sql.catalyst.rules.Rule
 
-object NormalizeCTEIds extends Rule[LogicalPlan]{
+object NormalizeCTEIds extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    val curId = new java.util.concurrent.atomic.AtomicLong()

Review Comment:
   I know this, directly change to `transformDownWithSubqueries` will cause UT 
`SPARK-51109` failed.
    For query 
   ```
     test("SPARK-51109: CTE in subquery expression as grouping column") {
       withTable("t") {
         Seq(1 -> 1).toDF("c1", "c2").write.saveAsTable("t")
         withView("v") {
           sql(
             """
               |CREATE VIEW v AS
               |WITH r AS (SELECT c1 + c2 AS c FROM t)
               |SELECT * FROM r
               |""".stripMargin)
           checkAnswer(
             sql("SELECT (SELECT max(c) FROM v WHERE c > id) FROM range(1) 
GROUP BY 1"),
             Row(2)
           )
         }
       }
     }
   ```
   
   Plan will be normalized from 
   ```
   Aggregate [scalar-subquery#15 [id#16L]], [scalar-subquery#15 [id#16L] AS 
scalarsubquery(id)#21]
   :  :- Aggregate [max(c#18) AS max(c)#20]
   :  :  +- Filter (cast(c#18 as bigint) > outer(id#16L))
   :  :     +- SubqueryAlias spark_catalog.default.v
   :  :        +- View (`spark_catalog`.`default`.`v`, [c#18])
   :  :           +- Project [cast(c#17 as int) AS c#18]
   :  :              +- WithCTE
   :  :                 :- CTERelationDef 1, false
   :  :                 :  +- SubqueryAlias r
   :  :                 :     +- Project [(c1#12 + c2#13) AS c#17]
   :  :                 :        +- SubqueryAlias spark_catalog.default.t
   :  :                 :           +- Relation 
spark_catalog.default.t[c1#12,c2#13] parquet
   :  :                 +- Project [c#17]
   :  :                    +- SubqueryAlias r
   :  :                       +- CTERelationRef 1, true, [c#17], false, false
   :  +- Aggregate [max(c#26) AS max(c)#27]
   :     +- Filter (cast(c#26 as bigint) > outer(id#16L))
   :        +- SubqueryAlias spark_catalog.default.v
   :           +- View (`spark_catalog`.`default`.`v`, [c#26])
   :              +- Project [cast(c#25 as int) AS c#26]
   :                 +- WithCTE
   :                    :- CTERelationDef 1, false
   :                    :  +- SubqueryAlias r
   :                    :     +- Project [(c1#22 + c2#23) AS c#24]
   :                    :        +- SubqueryAlias spark_catalog.default.t
   :                    :           +- Relation 
spark_catalog.default.t[c1#22,c2#23] parquet
   :                    +- Project [c#25]
   :                       +- SubqueryAlias r
   :                          +- CTERelationRef 1, true, [c#25], false, false
   +- Range (0, 1, step=1)
   
   ```
   
   to 
   ```
   Aggregate [scalar-subquery#15 [id#16L]], [scalar-subquery#15 [id#16L] AS 
scalarsubquery(id)#21]
   :  :- Aggregate [max(c#18) AS max(c)#20]
   :  :  +- Filter (cast(c#18 as bigint) > outer(id#16L))
   :  :     +- SubqueryAlias spark_catalog.default.v
   :  :        +- View (`spark_catalog`.`default`.`v`, [c#18])
   :  :           +- Project [cast(c#17 as int) AS c#18]
   :  :              +- WithCTE
   :  :                 :- CTERelationDef 0, false
   :  :                 :  +- SubqueryAlias r
   :  :                 :     +- Project [(c1#12 + c2#13) AS c#17]
   :  :                 :        +- SubqueryAlias spark_catalog.default.t
   :  :                 :           +- Relation 
spark_catalog.default.t[c1#12,c2#13] parquet
   :  :                 +- Project [c#17]
   :  :                    +- SubqueryAlias r
   :  :                       +- CTERelationRef 0, true, [c#17], false, false
   :  +- Aggregate [max(c#26) AS max(c)#27]
   :     +- Filter (cast(c#26 as bigint) > outer(id#16L))
   :        +- SubqueryAlias spark_catalog.default.v
   :           +- View (`spark_catalog`.`default`.`v`, [c#26])
   :              +- Project [cast(c#25 as int) AS c#26]
   :                 +- WithCTE
   :                    :- CTERelationDef 1, false
   :                    :  +- SubqueryAlias r
   :                    :     +- Project [(c1#22 + c2#23) AS c#24]
   :                    :        +- SubqueryAlias spark_catalog.default.t
   :                    :           +- Relation 
spark_catalog.default.t[c1#22,c2#23] parquet
   :                    +- Project [c#25]
   :                       +- SubqueryAlias r
   :                          +- CTERelationRef 1, true, [c#25], false, false
   +- Range (0, 1, step=1)
   ```
   
   in same plan the normalized  cte id changed causing throw 
   ```
   [info]  is not a valid aggregate expression: 
[SCALAR_SUBQUERY_IS_IN_GROUP_BY_OR_AGGREGATE_FUNCTION] The correlated scalar 
subquery '"scalarsubquery(id)"' is neither present in GROUP BY, nor in an 
aggregate function.
   [info] Add it to GROUP BY using ordinal position or wrap it in `first()` (or 
`first_value`) if you don't care which value you get. SQLSTATE: 0A000; line 1 
pos 7
   [info] Previous schema:scalarsubquery(id)#21
   ```
   
   I am still trying how to fix such problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to