Github user maropu commented on a diff in the pull request:
https://github.com/apache/spark/pull/20965#discussion_r182673750
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
---
@@ -266,28 +342,53 @@ case class HashAggregateExec(
e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
}
}
- ctx.currentVars = bufVars ++ input
+
+ // We need to copy the aggregation buffer to local variables first
because each aggregate
+ // function directly updates the buffer when it finishes.
+ val localBufVars = bufVars.zip(updateExpr).map { case (ev, e) =>
--- End diff --
If we don't have these copies, some queries return wrong answers, e.g.,
```
import org.apache.spark.sql.execution.debug._
sql("CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES (1,
1), (1, 2), (2, 1) AS testData(a, b)")
val df = sql("SELECT SKEWNESS(a) FROM testData")
df.debugCodegen
df.show
scala> df.show (copy)
+---------------------------+
|skewness(CAST(a AS DOUBLE))|
+---------------------------+
| 0.7071067811865475|
+---------------------------+
scala> df.show (non-copy)
+---------------------------+
|skewness(CAST(a AS DOUBLE))|
+---------------------------+
| -1.368454115659954|
+---------------------------+
```
In the original gen'd code, aggregation buffer updates happen in the end of
an `Aggregate` consume;
https://gist.github.com/maropu/ec5a322c40e9e7e0024c3074a261197b#file-codegen-d-skew-in-the-master-L807
But, if we split aggregation functions into pieces, these updates happen in
the end of each split function, e.g.,;
https://gist.github.com/maropu/1e9e7cb2377622549163261fc321a108#file-codegen-d-skew-in-spark-21870-2-local-copy-L822
So, if we don't copy the previous state to local buffers, the aggregation
wrongly references the updated state and then returns an incorrect answer.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]