AngersZhuuuu commented on a change in pull request #28496:
URL: https://github.com/apache/spark/pull/28496#discussion_r424845466



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##########
@@ -331,10 +331,30 @@ case class HashAggregateExec(
     }
   }
 
+  private def inputAttributes: Seq[Attribute] = {
+    if (modes.contains(Final) || modes.contains(PartialMerge)) {
+      // SPARK-31620: when planning aggregates, the partial aggregate uses 
aggregate function's
+      // `inputAggBufferAttributes` as its output. And Final and PartialMerge 
aggregate rely on the
+      // output to bind references for 
`DeclarativeAggregate.mergeExpressions`. But if we copy the
+      // aggregate function somehow after aggregate planning, like 
`PlanSubqueries`, the
+      // `DeclarativeAggregate` will be replaced by a new instance with new
+      // `inputAggBufferAttributes` and `mergeExpressions`. Then Final and 
PartialMerge aggregate
+      // can't bind the `mergeExpressions` with the output of the partial 
aggregate, as they use
+      // the `inputAggBufferAttributes` of the original `DeclarativeAggregate` 
before copy. Instead,
+      // we shall use `inputAggBufferAttributes` after copy to match the new 
`mergeExpressions`.
+      val aggAttrs = aggregateExpressions
+        .filter(a => a.mode == Final || !a.isDistinct).map(_.aggregateFunction)

Review comment:
       how about add UT with distinct aggregate expression?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to