bersprockets opened a new pull request, #48627:
URL: https://github.com/apache/spark/pull/48627

   ### What changes were proposed in this pull request?
   
   This PR adds code to `RewritePredicateSubquery#apply` to explicitly handle 
the case where an `Aggregate` node contains an aggregate expression in the 
left-hand operand of an IN-subquery expression. The explicit handler moves the 
IN-subquery expressions out of the `Aggregate` and into a parent `Project` 
node. The `Aggregate` will continue to perform the aggregations that were used 
as an operand to the IN-subquery expression, but will not include the 
IN-subquery expression itself. After pulling up IN-subquery expressions into a 
Project node, `RewritePredicateSubquery#apply` is called again to handle the 
`Project` as a `UnaryNode`. The `Join` will now be inserted between the 
`Project` and the `Aggregate` node, and the join condition will use an 
attribute rather than an aggregate expression, e.g.:
   ```
   Project [col1#32, exists#42 AS (sum(col2) IN (listquery()))#40]
   +- Join ExistenceJoin(exists#42), (sum(col2)#41L = c2#39L)
      :- Aggregate [col1#32], [col1#32, sum(col2#33) AS sum(col2)#41L]
      :  +- LocalRelation [col1#32, col2#33]
      +- LocalRelation [c2#39L]
   ```
   `sum(col2)#41L` in the above join condition, despite how it looks, is the 
name of the attribute, not an aggregate expression.
   
   ### Why are the changes needed?
   
   The following query fails:
   ```
   create or replace temp view v1(c1, c2) as values (1, 2), (1, 3), (2, 2), (3, 
7), (3, 1);
   create or replace temp view v2(col1, col2) as values (1, 2), (1, 3), (2, 2), 
(3, 7), (3, 1);
   
   select col1, sum(col2) in (select c2 from v1)
   from v2 group by col1;
   ```
   It fails with this error:
   ```
   [INTERNAL_ERROR] Cannot generate code for expression: sum(input[1, int, 
false]) SQLSTATE: XX000
   ```
   With SPARK_TESTING=1, it fails with this error:
   ```
   [PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule 
org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery in batch 
RewriteSubquery generated an invalid plan: Special expressions are placed in 
the wrong plan:
   Aggregate [col1#11], [col1#11, first(exists#20, false) AS (sum(col2) IN 
(listquery()))#19]
   +- Join ExistenceJoin(exists#20), (sum(col2#12) = c2#18L)
      :- LocalRelation [col1#11, col2#12]
      +- LocalRelation [c2#18L]
   ```
   The issue is that `RewritePredicateSubquery` builds a `Join` operator where 
the join condition contains an aggregate expression.
   
   The bug is in the handler for `UnaryNode` in 
`RewritePredicateSubquery#apply`, which adds a `Join` below the `Aggregate` and 
assumes that the left-hand operand of IN-subquery can be used in the join 
condition. This works fine for most cases, but not when the left-hand operand 
is an aggregate expression.
   
   This PR moves the offending IN-subqueries to a `Project` node, with the 
aggregates replaced by attributes referring to the aggregate expressions. The 
resulting join condition now uses those attributes rather than the actual 
aggregate expressions.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, other than allowing this type of query to succeed.
   
   ### How was this patch tested?
   
   New unit tests.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to