jchen5 commented on code in PR #45133:
URL: https://github.com/apache/spark/pull/45133#discussion_r1509535334


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -248,24 +248,72 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] 
with PredicateHelper {
     case u: UnaryNode if u.expressions.exists(
         SubqueryExpression.hasInOrCorrelatedExistsSubquery) =>
       var newChild = u.child
-      u.mapExpressions(expr => {
-        val (newExpr, p) = rewriteExistentialExpr(Seq(expr), newChild)
+      var introducedAttrs = Seq.empty[Attribute]
+      val updatedNode = u.mapExpressions(expr => {
+        val (newExpr, p, newAttrs) = 
rewriteExistentialExprWithAttrs(Seq(expr), newChild)
         newChild = p
+        introducedAttrs ++= newAttrs
         // The newExpr can not be None
         newExpr.get
       }).withNewChildren(Seq(newChild))
+      updatedNode match {
+        case a: Aggregate =>
+          // If we have introduced new `exists`-attributes that:
+          // 1) are referenced by aggregateExpressions within a 
non-aggregateFunction expression
+          // 2) are not referenced by groupingExpressions
+          // we wrap them in Max() aggregate function.
+          // Ideally we want to use any_value(), but Spark doesn't fully 
support any_value().

Review Comment:
   Add to the comment that this is simply to make the query valid. We know the 
value of the exists attribute is functionally determined by the grouping keys, 
so this is safe.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -248,24 +248,72 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] 
with PredicateHelper {
     case u: UnaryNode if u.expressions.exists(
         SubqueryExpression.hasInOrCorrelatedExistsSubquery) =>
       var newChild = u.child
-      u.mapExpressions(expr => {
-        val (newExpr, p) = rewriteExistentialExpr(Seq(expr), newChild)
+      var introducedAttrs = Seq.empty[Attribute]
+      val updatedNode = u.mapExpressions(expr => {
+        val (newExpr, p, newAttrs) = 
rewriteExistentialExprWithAttrs(Seq(expr), newChild)
         newChild = p
+        introducedAttrs ++= newAttrs
         // The newExpr can not be None
         newExpr.get
       }).withNewChildren(Seq(newChild))
+      updatedNode match {
+        case a: Aggregate =>
+          // If we have introduced new `exists`-attributes that:
+          // 1) are referenced by aggregateExpressions within a 
non-aggregateFunction expression
+          // 2) are not referenced by groupingExpressions
+          // we wrap them in Max() aggregate function.
+          // Ideally we want to use any_value(), but Spark doesn't fully 
support any_value().

Review Comment:
   Add to the comment that this is simply to make the aggregation valid. We 
know the value of the exists attribute is functionally determined by the 
grouping keys, so this is safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to