Github user clockfly commented on a diff in the pull request:
https://github.com/apache/spark/pull/14562#discussion_r75038089
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
---
@@ -219,111 +219,125 @@ sealed abstract class AggregateFunction extends
Expression with ImplicitCastInpu
}
/**
- * API for aggregation functions that are expressed in terms of imperative
initialize(), update(),
- * and merge() functions which operate on Row-based aggregation buffers.
- *
- * Within these functions, code should access fields of the mutable
aggregation buffer by adding the
- * bufferSchema-relative field number to `mutableAggBufferOffset` then
using this new field number
- * to access the buffer Row. This is necessary because this aggregation
function's buffer is
- * embedded inside of a larger shared aggregation buffer when an
aggregation operator evaluates
- * multiple aggregate functions at the same time.
- *
- * We need to perform similar field number arithmetic when merging
multiple intermediate
- * aggregate buffers together in `merge()` (in this case, use
`inputAggBufferOffset` when accessing
- * the input buffer).
- *
- * Correct ImperativeAggregate evaluation depends on the correctness of
`mutableAggBufferOffset` and
- * `inputAggBufferOffset`, but not on the correctness of the attribute ids
in `aggBufferAttributes`
- * and `inputAggBufferAttributes`.
+ * API for aggregation functions that are expressed in terms of imperative
doInitialize(),
+ * doUpdate(), doMerge() and doComplete() functions which operate on
Row-based aggregation buffers.
*/
abstract class ImperativeAggregate extends AggregateFunction with
CodegenFallback {
+ // Although `mutableBufferRow` and `inputBufferRow` are 2 mutable fields
in `ImperativeAggregate`,
+ // they can only be set once, thus make `ImperativeAggregate` kind of
immutable and stateless.
+
+ /**
+ * The aggregation operator keeps a large shared mutable buffer row for
all aggregate functions,
+ * each aggregate function should only access a slice of this shared
buffer.
+ */
+ private var mutableBufferRow: SlicedMutableRow = _
+
+ /**
+ * During partial aggregation, the input buffer row to be merged is
shared among all aggregate
+ * functions, each aggregate function should only access a slice of this
input buffer.
+ */
+ private var inputBufferRow: SlicedInternalRow = _
+
/**
- * The offset of this function's first buffer value in the underlying
shared mutable aggregation
- * buffer.
+ * Set the offset of this function's start buffer value in the
underlying shared mutable
+ * aggregation buffer.
*
* For example, we have two aggregate functions `avg(x)` and `avg(y)`,
which share the same
- * aggregation buffer. In this shared buffer, the position of the first
buffer value of `avg(x)`
- * will be 0 and the position of the first buffer value of `avg(y)` will
be 2:
+ * aggregation buffer. In this shared buffer, the position of the start
buffer value of `avg(x)`
+ * will be 0 and the position of the start buffer value of `avg(y)` will
be 2:
* {{{
- * avg(x) mutableAggBufferOffset = 0
+ * avg(x) mutable buffer offset is 0
* |
* v
* +--------+--------+--------+--------+
* | sum1 | count1 | sum2 | count2 |
* +--------+--------+--------+--------+
* ^
* |
- * avg(y) mutableAggBufferOffset = 2
+ * avg(y) mutable buffer offset is 2
* }}}
*/
- protected val mutableAggBufferOffset: Int
-
- /**
- * Returns a copy of this ImperativeAggregate with an updated
mutableAggBufferOffset.
- * This new copy's attributes may have different ids than the original.
- */
- def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int):
ImperativeAggregate
+ final def setMutableBufferOffset(offset: Int): Unit = {
+ assert(mutableBufferRow == null)
--- End diff --
Do you want to do a runtime check? Then how about using `require`?
`assert` may be removed by compiler.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]