bersprockets commented on code in PR #48627:
URL: https://github.com/apache/spark/pull/48627#discussion_r1924560656


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -246,46 +267,106 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
         }
       }
 
+    // Handle the case where the left-hand side of an IN-subquery contains an 
aggregate.
+    //
+    // If an Aggregate node contains such an IN-subquery, this handler will 
pull up all
+    // expressions from the Aggregate node into a new Project node. The new 
Project node
+    // will then be handled by the Unary node handler.
+    //
+    // The Unary node handler uses the left-hand side of the IN-subquery in a
+    // join condition. Thus, without this pre-transformation, the join 
condition
+    // contains an aggregate, which is illegal. With this pre-transformation, 
the
+    // join condition contains an attribute from the left-hand side of the
+    // IN-subquery contained in the Project node.
+    //
+    // For example:
+    //
+    //   SELECT SUM(col2) IN (SELECT c3 FROM v1) AND SUM(col3) > -1 AS x
+    //   FROM v2;
+    //
+    // The above query has this plan on entry to 
RewritePredicateSubquery#apply:
+    //
+    //   Aggregate [(sum(col2#18) IN (list#12 []) AND (sum(col3#19) > -1)) AS 
x#13]
+    //   :  +- LocalRelation [c3#28L]
+    //   +- LocalRelation [col2#18, col3#19]
+    //
+    // Note that the Aggregate node contains the IN-subquery and the left-hand
+    // side of the IN-subquery is an aggregate expression sum(col2#18)).
+    //
+    // This handler transforms the above plan into the following:
+    // scalastyle:off line.size.limit
+    //
+    //   Project [(_aggregateexpression#20L IN (list#12 []) AND 
(_aggregateexpression#21L > -1)) AS x#13]
+    //   :  +- LocalRelation [c3#28L]
+    //   +- Aggregate [sum(col2#18) AS _aggregateexpression#20L, sum(col3#19) 
AS _aggregateexpression#21L]
+    //      +- LocalRelation [col2#18, col3#19]
+    //
+    // scalastyle:on
+    // Note that both the IN-subquery and the greater-than expressions have 
been
+    // pulled up into the Project node. These expressions use attributes
+    // (_aggregateexpression#20L and _aggregateexpression#21L) to refer to the 
aggregations
+    // which are still performed in the Aggregate node (sum(col2#18) and 
sum(col3#19)).
+    case p @ PhysicalAggregation(
+        groupingExpressions, aggregateExpressions, resultExpressions, child)
+        if exprsContainsAggregateInSubquery(p.expressions) =>

Review Comment:
   Re: `if exprsContainsAggregateInSubquery(resultExpressions) =>`.
   
   That won't work with`exprsContainsAggregateInSubquery` as it currently 
stands, since that function looks for in-subqueries with aggregate expressions 
in the left-hand operand. `resultExpressions` has the aggregate expressions 
replaced with attributes, so `exprsContainsAggregateInSubquery` would never 
trigger.
   
   Alternatively, I could do
   ```
   if 
exprsContainsAggregateInSubquery(p.asInstanceOf[Aggregate].aggregateExpressions)
 =>
   ```
   
   which is kind of ugly, but does the trick.
   
   Another alternative: I'm the only one calling 
`exprsContainsAggregateInSubquery`, so I could change it to return true if 
there are any in-subqueries at all with no regard to characteristics of the 
left-hand operand. We would end up rewriting some cases that wouldn't otherwise 
cause trouble.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to