Github user yhuai commented on a diff in the pull request:
https://github.com/apache/spark/pull/9038#discussion_r41714857
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
---
@@ -134,19 +137,74 @@ class TungstenAggregationIterator(
completeAggregateExpressions.map(_.mode).distinct.headOption
}
- // All aggregate functions. TungstenAggregationIterator only handles
expression-based aggregate.
- // If there is any functions that is an ImperativeAggregateFunction, we
throw an
- // IllegalStateException.
- private[this] val allAggregateFunctions: Array[DeclarativeAggregate] = {
- if (!allAggregateExpressions.forall(
- _.aggregateFunction.isInstanceOf[DeclarativeAggregate])) {
- throw new IllegalStateException(
- "Only ExpressionAggregateFunctions should be passed in
TungstenAggregationIterator.")
+ // Initialize all AggregateFunctions by binding references, if necessary,
+ // and setting inputBufferOffset and mutableBufferOffset.
+ private def initializeAllAggregateFunctions(
+ startingInputBufferOffset: Int): Array[AggregateFunction2] = {
+ var mutableBufferOffset = 0
+ var inputBufferOffset: Int = startingInputBufferOffset
+ val functions = new
Array[AggregateFunction2](allAggregateExpressions.length)
+ var i = 0
+ while (i < allAggregateExpressions.length) {
+ val func = allAggregateExpressions(i).aggregateFunction
--- End diff --
Yeah, it will be good to return a new instance when we call `with*Offset`.
For UDAF, I think it may make sense to add a `newInstance` interface so, if a
user-defined one has internal states, we can create copies of it at executor
side. There is one difficulty that we need to preserve those
aggBufferAttributes and inputAggBufferAttributes' exprIds when we create new
instances. So, binding references can work correctly without requiring too many
changes. Is there an easy way to do that?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]