anton5798 commented on code in PR #45133:
URL: https://github.com/apache/spark/pull/45133#discussion_r1510902398
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -248,24 +248,76 @@ object RewritePredicateSubquery extends Rule[LogicalPlan]
with PredicateHelper {
case u: UnaryNode if u.expressions.exists(
SubqueryExpression.hasInOrCorrelatedExistsSubquery) =>
var newChild = u.child
- u.mapExpressions(expr => {
- val (newExpr, p) = rewriteExistentialExpr(Seq(expr), newChild)
+ var introducedAttrs = Seq.empty[Attribute]
+ val updatedNode = u.mapExpressions(expr => {
+ val (newExpr, p, newAttrs) =
rewriteExistentialExprWithAttrs(Seq(expr), newChild)
newChild = p
+ introducedAttrs ++= newAttrs
// The newExpr can not be None
newExpr.get
}).withNewChildren(Seq(newChild))
+ updatedNode match {
+ case a: Aggregate =>
+ // If we have introduced new `exists`-attributes that:
+ // 1) are referenced by aggregateExpressions within a
non-aggregateFunction expression
+ // 2) are not referenced by groupingExpressions
+ // we wrap them in first() aggregate function. first() is Spark's
executable version of
+ // any_value() aggregate function.
+ // We do this to keep the aggregation valid, i.e avoid references
outside of aggregate
+ // functions that are not in grouping expressions.
+ // Here, the value of `exists` is functionally determined by
grouping expressions, so
+ // applying any aggregate function is semantically safe.
+ val aggFunctionReferences = a.aggregateExpressions.
+ flatMap(extractAggregateExpressions).
+ flatMap(_.references).toSet
+ val nonAggFuncReferences =
+
a.aggregateExpressions.flatMap(_.references).filterNot(aggFunctionReferences.contains)
+ val groupingReferences = a.groupingExpressions.flatMap(_.references)
Review Comment:
Great point! tldr: I'll remove this check whatsoever, below is the
explanation.
Here we're only interested in exists variables. However, when we have a
subquery as a grouping expression (or a part thereof), it'll be replaced via
[this
rule](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PullOutGroupingExpressions.scala#L48),
and the new code will not kick in at all. E.g a very bizarre query that looks
like this:
```
SELECT
cast(t1a in (select t2a from t2) as int) + 1 as groupExpr,
sum(cast(t1a in (select t2a from t2) as int) + 1) as aggExpr,
cast(t1a in (select t2a from t2) as int) + 1 + cast(exists (select t2a
from t2) as int)
as complexExpr
FROM t1
GROUP BY
cast(t1a in (select t2a from t2) as int) + 1;
```
will have the grouping expression rewritten. I'm adding this test.
So I'm pretty sure that groupingReferences will _never_ contain such
`exists` of interest.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]