anton5798 commented on code in PR #45133:
URL: https://github.com/apache/spark/pull/45133#discussion_r1510902398


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -248,24 +248,76 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] 
with PredicateHelper {
     case u: UnaryNode if u.expressions.exists(
         SubqueryExpression.hasInOrCorrelatedExistsSubquery) =>
       var newChild = u.child
-      u.mapExpressions(expr => {
-        val (newExpr, p) = rewriteExistentialExpr(Seq(expr), newChild)
+      var introducedAttrs = Seq.empty[Attribute]
+      val updatedNode = u.mapExpressions(expr => {
+        val (newExpr, p, newAttrs) = 
rewriteExistentialExprWithAttrs(Seq(expr), newChild)
         newChild = p
+        introducedAttrs ++= newAttrs
         // The newExpr can not be None
         newExpr.get
       }).withNewChildren(Seq(newChild))
+      updatedNode match {
+        case a: Aggregate =>
+          // If we have introduced new `exists`-attributes that:
+          // 1) are referenced by aggregateExpressions within a 
non-aggregateFunction expression
+          // 2) are not referenced by groupingExpressions
+          // we wrap them in first() aggregate function. first() is Spark's 
executable version of
+          // any_value() aggregate function.
+          // We do this to keep the aggregation valid, i.e avoid references 
outside of aggregate
+          // functions that are not in grouping expressions.
+          // Here, the value of `exists` is functionally determined by 
grouping expressions, so
+          // applying any aggregate function is semantically safe.
+          val aggFunctionReferences = a.aggregateExpressions.
+            flatMap(extractAggregateExpressions).
+            flatMap(_.references).toSet
+          val nonAggFuncReferences =
+            
a.aggregateExpressions.flatMap(_.references).filterNot(aggFunctionReferences.contains)
+          val groupingReferences = a.groupingExpressions.flatMap(_.references)

Review Comment:
   Great point! tldr: I'll remove this check whatsoever, below is the 
explanation.
   
   Here we're only interested in exists variables. However, when we have a 
subquery as a grouping expression (or a part thereof), it'll be replaced via 
[this 
rule](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PullOutGroupingExpressions.scala#L48),
 and the new code will not kick in at all. E.g a very bizarre query that looks 
like this:
   
   ```
   SELECT
       cast(t1a in (select t2a from t2) as int) + 1 as groupExpr,
       sum(cast(t1a in (select t2a from t2) as int) + 1) as aggExpr,
       cast(t1a in (select t2a from t2) as int) + 1 + cast(exists (select t2a 
from t2) as int)
           as complexExpr
   FROM t1
   GROUP BY
       cast(t1a in (select t2a from t2) as int) + 1;
   ```
   will have the grouping expression rewritten. I'm adding this test.
   
   So I'm pretty sure that groupingReferences will _never_ contain such 
`exists` of interest. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to