dtenedor commented on code in PR #48627:
URL: https://github.com/apache/spark/pull/48627#discussion_r1851150355
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -245,6 +266,55 @@ object RewritePredicateSubquery extends Rule[LogicalPlan]
with PredicateHelper {
condition = Some(newCondition)))
}
}
+ case a: Aggregate if
exprsContainsAggregateInSubquery(a.aggregateExpressions) =>
+ // find expressions with an IN-subquery whose left-hand operand contains
aggregates
+ val withInSubquery =
a.aggregateExpressions.filter(exprContainsAggregateInSubquery(_))
+
+ // extract the aggregate expressions from withInSubquery
+ val inSubqueryMapping = withInSubquery.map { e =>
+ (e, extractAggregateExpressions(e))
+ }
+
+ val inSubqueryMap = inSubqueryMapping.toMap
+ // get all aggregate expressions found in left-hand operands of
IN-subqueries
+ val aggregateExprs = inSubqueryMapping.flatMap(_._2)
+ // create aliases for each above aggregate expression
+ val aggregateExprAliases = aggregateExprs.map(a => Alias(a,
toPrettySQL(a))())
+ // create a mapping from each aggregate expression to its alias
+ val aggregateExprAliasMap =
aggregateExprs.zip(aggregateExprAliases).toMap
+ // create attributes from those aliases of aggregate expressions
+ val aggregateExprAttrs = aggregateExprAliases.map(_.toAttribute)
+ // create a mapping from aggregate expressions to attributes
+ val aggregateExprAttrMap = aggregateExprs.zip(aggregateExprAttrs).toMap
+
+ // create an Aggregate node without the offending IN-subqueries, just
+ // the aggregates themselves and all the other aggregate expressions.
+ val newAggregateExpressions = a.aggregateExpressions.flatMap {
+ // if this expression contains IN-subqueries with aggregates in the
left-hand
+ // operand, replace with just the aggregates
+ case ae: Expression if inSubqueryMap.contains(ae) =>
+ // replace the expression with an aliased aggregate expression
+ inSubqueryMap(ae).map(aggregateExprAliasMap(_))
+ case ae @ _ => Seq(ae)
Review Comment:
```suggestion
case ae => Seq(ae)
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -245,6 +266,55 @@ object RewritePredicateSubquery extends Rule[LogicalPlan]
with PredicateHelper {
condition = Some(newCondition)))
}
}
+ case a: Aggregate if
exprsContainsAggregateInSubquery(a.aggregateExpressions) =>
+ // find expressions with an IN-subquery whose left-hand operand contains
aggregates
+ val withInSubquery =
a.aggregateExpressions.filter(exprContainsAggregateInSubquery(_))
+
+ // extract the aggregate expressions from withInSubquery
+ val inSubqueryMapping = withInSubquery.map { e =>
+ (e, extractAggregateExpressions(e))
+ }
+
+ val inSubqueryMap = inSubqueryMapping.toMap
+ // get all aggregate expressions found in left-hand operands of
IN-subqueries
+ val aggregateExprs = inSubqueryMapping.flatMap(_._2)
+ // create aliases for each above aggregate expression
+ val aggregateExprAliases = aggregateExprs.map(a => Alias(a,
toPrettySQL(a))())
+ // create a mapping from each aggregate expression to its alias
+ val aggregateExprAliasMap =
aggregateExprs.zip(aggregateExprAliases).toMap
+ // create attributes from those aliases of aggregate expressions
+ val aggregateExprAttrs = aggregateExprAliases.map(_.toAttribute)
+ // create a mapping from aggregate expressions to attributes
+ val aggregateExprAttrMap = aggregateExprs.zip(aggregateExprAttrs).toMap
+
+ // create an Aggregate node without the offending IN-subqueries, just
+ // the aggregates themselves and all the other aggregate expressions.
+ val newAggregateExpressions = a.aggregateExpressions.flatMap {
+ // if this expression contains IN-subqueries with aggregates in the
left-hand
+ // operand, replace with just the aggregates
+ case ae: Expression if inSubqueryMap.contains(ae) =>
+ // replace the expression with an aliased aggregate expression
+ inSubqueryMap(ae).map(aggregateExprAliasMap(_))
+ case ae @ _ => Seq(ae)
+ }
+ val newAggregate = a.copy(aggregateExpressions = newAggregateExpressions)
+
+ // Create a projection with the IN-subquery expressions that contain
aggregates, replacing
+ // the aggregate expressions with attribute references to the output of
the Aggregate
+ // operator. Also include the other output of the Aggregate operator.
+ val projList = a.aggregateExpressions.map {
+ // if this expression contains an IN-subquery that uses an aggregate,
we
+ // need to do something special
+ case ae: Expression if inSubqueryMap.contains(ae) =>
+ ae.transform {
+ // patch any aggregate expression with its corresponding attribute
+ case a: AggregateExpression => aggregateExprAttrMap(a)
+ }.asInstanceOf[NamedExpression]
+ case ae @ _ => ae.toAttribute
Review Comment:
```suggestion
case ae => ae.toAttribute
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -245,6 +266,55 @@ object RewritePredicateSubquery extends Rule[LogicalPlan]
with PredicateHelper {
condition = Some(newCondition)))
}
}
+ case a: Aggregate if
exprsContainsAggregateInSubquery(a.aggregateExpressions) =>
+ // find expressions with an IN-subquery whose left-hand operand contains
aggregates
+ val withInSubquery =
a.aggregateExpressions.filter(exprContainsAggregateInSubquery(_))
+
+ // extract the aggregate expressions from withInSubquery
+ val inSubqueryMapping = withInSubquery.map { e =>
+ (e, extractAggregateExpressions(e))
+ }
+
+ val inSubqueryMap = inSubqueryMapping.toMap
+ // get all aggregate expressions found in left-hand operands of
IN-subqueries
Review Comment:
It's a bit hard to follow this logic in the code. Can you add a comment with
a brief example, showing the query plan and the steps performed here?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -245,6 +266,55 @@ object RewritePredicateSubquery extends Rule[LogicalPlan]
with PredicateHelper {
condition = Some(newCondition)))
}
}
+ case a: Aggregate if
exprsContainsAggregateInSubquery(a.aggregateExpressions) =>
+ // find expressions with an IN-subquery whose left-hand operand contains
aggregates
Review Comment:
please express the comments as full sentences (imperative is OK) starting
with capital letters and ending in punctuation.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -245,6 +266,55 @@ object RewritePredicateSubquery extends Rule[LogicalPlan]
with PredicateHelper {
condition = Some(newCondition)))
}
}
+ case a: Aggregate if
exprsContainsAggregateInSubquery(a.aggregateExpressions) =>
Review Comment:
This file is already over 1000 lines long, can we move this logic to a
helper object in another file to improve the code health?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]