pgandhi999 commented on issue #24149: [SPARK-27207] : Ensure aggregate buffers 
are initialized again for So…
URL: https://github.com/apache/spark/pull/24149#issuecomment-476386389
 
 
   @cloud-fan Regarding our discussion in PR #24144 , I just found out a case 
where Spark initializes a UDAF, runs `update` and then runs `merge`. It happens 
in `SortBasedAggregator`. So, the code blows up in this case. The code in 
`ObjectAggregationIterator.scala` is pasted below:
   
   ```
   // Two-way merges initialAggBufferIterator and inputIterator
         private def findNextSortedGroup(): Boolean = {
           if (hasNextInput || hasNextAggBuffer) {
             // Find smaller key of the initialAggBufferIterator and 
initialAggBufferIterator
             groupingKey = findGroupingKey()
             result = new AggregationBufferEntry(groupingKey, 
makeEmptyAggregationBuffer)
   
             // Firstly, update the aggregation buffer with input rows.
             while (hasNextInput &&
               groupingKeyOrdering.compare(inputIterator.getKey, groupingKey) 
== 0) {
               processRow(result.aggregationBuffer, inputIterator.getValue)
               hasNextInput = inputIterator.next()
             }
   
             // Secondly, merge the aggregation buffer with existing 
aggregation buffers.
             // NOTE: the ordering of these two while-block matter, 
mergeAggregationBuffer() should
             // be called after calling processRow.
             while (hasNextAggBuffer &&
               groupingKeyOrdering.compare(initialAggBufferIterator.getKey, 
groupingKey) == 0) {
               mergeAggregationBuffers(result.aggregationBuffer, 
initialAggBufferIterator.getValue)
               hasNextAggBuffer = initialAggBufferIterator.next()
             }
   
             true
           } else {
             false
           }
         }
   ```
   
   It calls `update` first and then calls `merge` on the same buffer. I found 
out the issue while testing this PR today. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to