peter-toth commented on a change in pull request #31913:
URL: https://github.com/apache/spark/pull/31913#discussion_r598506466



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -870,8 +870,19 @@ object CollapseProject extends Rule[LogicalPlan] with 
AliasHelper {
       if (haveCommonNonDeterministicOutput(p.projectList, 
agg.aggregateExpressions)) {
         p
       } else {
-        agg.copy(aggregateExpressions = buildCleanedProjectList(
-          p.projectList, agg.aggregateExpressions))
+        val complexGroupingExpressions =
+          ExpressionSet(agg.groupingExpressions.filter(_.children.nonEmpty))
+
+        def wrapGroupingExpression(e: Expression): Expression = e match {
+          case _: AggregateExpression => e
+          case _ if complexGroupingExpressions.contains(e) => 
GroupingExpression(e)
+          case _ => e.mapChildren(wrapGroupingExpression)
+        }
+
+        val wrappedAggregateExpressions =
+          
agg.aggregateExpressions.map(wrapGroupingExpression(_).asInstanceOf[NamedExpression])
+        agg.copy(aggregateExpressions =
+          buildCleanedProjectList(p.projectList, wrappedAggregateExpressions))

Review comment:
       Thanks for the test case @maropu.
   I moved the logic to a new rule and simplified the test case in: 
https://github.com/apache/spark/pull/31913/commits/2293fd40449b9b540de9aa6614b6ca10698b7433
 and updated the PR description.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to