dtenedor commented on code in PR #48627:
URL: https://github.com/apache/spark/pull/48627#discussion_r1851150355


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -245,6 +266,55 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] 
with PredicateHelper {
             condition = Some(newCondition)))
         }
       }
+    case a: Aggregate if 
exprsContainsAggregateInSubquery(a.aggregateExpressions) =>
+      // find expressions with an IN-subquery whose left-hand operand contains 
aggregates
+      val withInSubquery = 
a.aggregateExpressions.filter(exprContainsAggregateInSubquery(_))
+
+      // extract the aggregate expressions from withInSubquery
+      val inSubqueryMapping = withInSubquery.map { e =>
+        (e, extractAggregateExpressions(e))
+      }
+
+      val inSubqueryMap = inSubqueryMapping.toMap
+      // get all aggregate expressions found in left-hand operands of 
IN-subqueries
+      val aggregateExprs = inSubqueryMapping.flatMap(_._2)
+      // create aliases for each above aggregate expression
+      val aggregateExprAliases = aggregateExprs.map(a => Alias(a, 
toPrettySQL(a))())
+      // create a mapping from each aggregate expression to its alias
+      val aggregateExprAliasMap = 
aggregateExprs.zip(aggregateExprAliases).toMap
+      // create attributes from those aliases of aggregate expressions
+      val aggregateExprAttrs = aggregateExprAliases.map(_.toAttribute)
+      // create a mapping from aggregate expressions to attributes
+      val aggregateExprAttrMap = aggregateExprs.zip(aggregateExprAttrs).toMap
+
+      // create an Aggregate node without the offending IN-subqueries, just
+      // the aggregates themselves and all the other aggregate expressions.
+      val newAggregateExpressions = a.aggregateExpressions.flatMap {
+        // if this expression contains IN-subqueries with aggregates in the 
left-hand
+        // operand, replace with just the aggregates
+        case ae: Expression if inSubqueryMap.contains(ae) =>
+          // replace the expression with an aliased aggregate expression
+          inSubqueryMap(ae).map(aggregateExprAliasMap(_))
+        case ae @ _ => Seq(ae)

Review Comment:
   ```suggestion
           case ae => Seq(ae)
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -245,6 +266,55 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] 
with PredicateHelper {
             condition = Some(newCondition)))
         }
       }
+    case a: Aggregate if 
exprsContainsAggregateInSubquery(a.aggregateExpressions) =>
+      // find expressions with an IN-subquery whose left-hand operand contains 
aggregates
+      val withInSubquery = 
a.aggregateExpressions.filter(exprContainsAggregateInSubquery(_))
+
+      // extract the aggregate expressions from withInSubquery
+      val inSubqueryMapping = withInSubquery.map { e =>
+        (e, extractAggregateExpressions(e))
+      }
+
+      val inSubqueryMap = inSubqueryMapping.toMap
+      // get all aggregate expressions found in left-hand operands of 
IN-subqueries
+      val aggregateExprs = inSubqueryMapping.flatMap(_._2)
+      // create aliases for each above aggregate expression
+      val aggregateExprAliases = aggregateExprs.map(a => Alias(a, 
toPrettySQL(a))())
+      // create a mapping from each aggregate expression to its alias
+      val aggregateExprAliasMap = 
aggregateExprs.zip(aggregateExprAliases).toMap
+      // create attributes from those aliases of aggregate expressions
+      val aggregateExprAttrs = aggregateExprAliases.map(_.toAttribute)
+      // create a mapping from aggregate expressions to attributes
+      val aggregateExprAttrMap = aggregateExprs.zip(aggregateExprAttrs).toMap
+
+      // create an Aggregate node without the offending IN-subqueries, just
+      // the aggregates themselves and all the other aggregate expressions.
+      val newAggregateExpressions = a.aggregateExpressions.flatMap {
+        // if this expression contains IN-subqueries with aggregates in the 
left-hand
+        // operand, replace with just the aggregates
+        case ae: Expression if inSubqueryMap.contains(ae) =>
+          // replace the expression with an aliased aggregate expression
+          inSubqueryMap(ae).map(aggregateExprAliasMap(_))
+        case ae @ _ => Seq(ae)
+      }
+      val newAggregate = a.copy(aggregateExpressions = newAggregateExpressions)
+
+      // Create a projection with the IN-subquery expressions that contain 
aggregates, replacing
+      // the aggregate expressions with attribute references to the output of 
the Aggregate
+      // operator. Also include the other output of the Aggregate operator.
+      val projList = a.aggregateExpressions.map {
+        // if this expression contains an IN-subquery that uses an aggregate, 
we
+        // need to do something special
+        case ae: Expression if inSubqueryMap.contains(ae) =>
+          ae.transform {
+            // patch any aggregate expression with its corresponding attribute
+            case a: AggregateExpression => aggregateExprAttrMap(a)
+          }.asInstanceOf[NamedExpression]
+        case ae @ _ => ae.toAttribute

Review Comment:
   ```suggestion
           case ae => ae.toAttribute
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -245,6 +266,55 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] 
with PredicateHelper {
             condition = Some(newCondition)))
         }
       }
+    case a: Aggregate if 
exprsContainsAggregateInSubquery(a.aggregateExpressions) =>
+      // find expressions with an IN-subquery whose left-hand operand contains 
aggregates
+      val withInSubquery = 
a.aggregateExpressions.filter(exprContainsAggregateInSubquery(_))
+
+      // extract the aggregate expressions from withInSubquery
+      val inSubqueryMapping = withInSubquery.map { e =>
+        (e, extractAggregateExpressions(e))
+      }
+
+      val inSubqueryMap = inSubqueryMapping.toMap
+      // get all aggregate expressions found in left-hand operands of 
IN-subqueries

Review Comment:
   It's a bit hard to follow this logic in the code. Can you add a comment with 
a brief example, showing the query plan and the steps performed here?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -245,6 +266,55 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] 
with PredicateHelper {
             condition = Some(newCondition)))
         }
       }
+    case a: Aggregate if 
exprsContainsAggregateInSubquery(a.aggregateExpressions) =>
+      // find expressions with an IN-subquery whose left-hand operand contains 
aggregates

Review Comment:
   please express the comments as full sentences (imperative is OK) starting 
with capital letters and ending in punctuation.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -245,6 +266,55 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] 
with PredicateHelper {
             condition = Some(newCondition)))
         }
       }
+    case a: Aggregate if 
exprsContainsAggregateInSubquery(a.aggregateExpressions) =>

Review Comment:
   This file is already over 1000 lines long, can we move this logic to a 
helper object in another file to improve the code health?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to