Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8973#discussion_r41348104
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
    @@ -95,98 +92,192 @@ private[sql] case class AggregateExpression2(
       override def toString: String = 
s"(${aggregateFunction},mode=$mode,isDistinct=$isDistinct)"
     }
     
    -abstract class AggregateFunction2
    -  extends Expression with ImplicitCastInputTypes {
    +/**
    + * AggregateFunction2 is the superclass of two aggregation function 
interfaces:
    + *
    + *  - [[ImperativeAggregate]] is for aggregation functions that are 
specified in terms of
    + *    initialize(), update(), and merge() functions that operate on 
Row-based aggregation buffers.
    + *  - [[ExpressionAggregate]] is for aggregation functions that are 
specified using
    + *    Catalyst expressions.
    + *
    + * In both interfaces, aggregates must define the schema 
([[aggBufferSchema]]) and attributes
    + * ([[aggBufferAttributes]]) of an aggregation buffer which is used to 
hold partial aggregate
    + * results. At runtime, multiple aggregate functions are evaluated by the 
same operator using a
    + * combined aggregation buffer which concatenates the aggregation buffers 
of the individual
    + * aggregate functions.
    + *
    + * Code which accepts [[AggregateFunction2]] instances should be prepared 
to handle both types of
    + * aggregate functions.
    + */
    +sealed abstract class AggregateFunction2 extends Expression with 
ImplicitCastInputTypes {
     
       /** An aggregate function is not foldable. */
       final override def foldable: Boolean = false
     
    +  /** The schema of the aggregation buffer. */
    +  def aggBufferSchema: StructType
    +
    +  /** Attributes of fields in aggBufferSchema. */
    +  def aggBufferAttributes: Seq[AttributeReference]
    +
       /**
    -   * The offset of this function's start buffer value in the
    -   * underlying shared mutable aggregation buffer.
    -   * For example, we have two aggregate functions `avg(x)` and `avg(y)`, 
which share
    -   * the same aggregation buffer. In this shared buffer, the position of 
the first
    -   * buffer value of `avg(x)` will be 0 and the position of the first 
buffer value of `avg(y)`
    -   * will be 2.
    +   * Attributes of fields in input aggregation buffers (immutable 
aggregation buffers that are
    +   * merged with mutable aggregation buffers in the merge() function or 
merge expressions).
    +   * These attributes are created automatically by cloning the 
[[aggBufferAttributes]].
        */
    -  protected var mutableBufferOffset: Int = 0
    -
    -  def withNewMutableBufferOffset(newMutableBufferOffset: Int): Unit = {
    -    mutableBufferOffset = newMutableBufferOffset
    -  }
    +  def inputAggBufferAttributes: Seq[AttributeReference]
    --- End diff --
    
    @yhuai, why do we need to use different attributes to refer to the 
aggregation buffer columns when they are embedded inside of a `[grouping 
columns][partial agg columns]` result? Why can't you just re-use the same 
attributes? It seems like this should significantly improve the code, but I 
might be overlooking a reason why we need to do this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to