Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9038#discussion_r42032759
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
 ---
    @@ -481,10 +610,10 @@ class TungstenAggregationIterator(
           // When needsProcess is false, the format of input rows is 
groupingKey + aggregation buffer.
           // We need to project the aggregation buffer part from an input row.
           val buffer = createNewAggregationBuffer()
    -      // The originalInputAttributes are using cloneBufferAttributes. So, 
we need to use
    -      // allAggregateFunctions.flatMap(_.cloneBufferAttributes).
    +      // The originalInputAttributes are using inputAggBufferAttributes. 
So, we need to use
    +      // allAggregateFunctions.flatMap(_.inputAggBufferAttributes).
           val bufferExtractor = newMutableProjection(
    -        allAggregateFunctions.flatMap(_.inputAggBufferAttributes),
    +        originalInputAttributes.drop(initialInputBufferOffset),
    --- End diff --
    
    Had a discussion with @JoshRosen offline. Here is the explaining for this 
change.
    
    Our `ImperativeAggregate` and `DeclarativeAggregate` are quite different. 
For an `ImperativeAggregate`, its correctness is based on the correctness of 
`mutableAggBufferOffset` and `inputAggBufferOffset`, and when there is no 
fallback, it does not rely on attribute ids for binding. To make 
`mutableAggBufferOffset` and `inputAggBufferOffset` immutable, whenever we call 
`withNewMutableAggBufferOffset` and `withNewInputAggBufferOffset`, we create a 
new copy of the `ImperativeAggregate`. Although a new copy contains a new set 
of attribute ids for `aggBufferAttributes` and `inputAggBufferAttributes`, in 
most cases, those new ids do not introduce any problem because an 
`ImperativeAggregate` does not rely on these ids. However, once we fallback to 
sort-based aggregation and we need to extract input aggregation buffers, we 
need to have matching attribute ids to make it work. Since  
`allAggregateFunctions.flatMap(_.inputAggBufferAttributes)` will have 
attributes with a new set of ids, we w
 ill see a binding error at here. 
`originalInputAttributes.drop(initialInputBufferOffset)` is fine at here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to