m44444 commented on a change in pull request #24149: [SPARK-27207] : Ensure
aggregate buffers are initialized again for So…
URL: https://github.com/apache/spark/pull/24149#discussion_r277914333
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
##########
@@ -258,21 +275,29 @@ class SortBasedAggregator(
if (hasNextInput || hasNextAggBuffer) {
// Find smaller key of the initialAggBufferIterator and
initialAggBufferIterator
groupingKey = findGroupingKey()
- result = new AggregationBufferEntry(groupingKey,
makeEmptyAggregationBuffer)
+ updateResult = new AggregationBufferEntry(
+ groupingKey,
makeEmptyAggregationBufferForSortBasedUpdateAggFunctions)
+ finalResult = new AggregationBufferEntry(
+ groupingKey,
makeEmptyAggregationBufferForSortBasedMergeAggFunctions)
// Firstly, update the aggregation buffer with input rows.
while (hasNextInput &&
groupingKeyOrdering.compare(inputIterator.getKey, groupingKey) ==
0) {
- processRow(result.aggregationBuffer, inputIterator.getValue)
+ processRow(updateResult.aggregationBuffer, inputIterator.getValue)
hasNextInput = inputIterator.next()
}
+ // This step ensures that the contents of the updateResult
aggregation buffer are
+ // merged with the finalResult aggregation buffer to maintain
consistency
+ serializeBuffer(updateResult.aggregationBuffer)
+ mergeAggregationBuffers(finalResult.aggregationBuffer,
updateResult.aggregationBuffer)
// Secondly, merge the aggregation buffer with existing aggregation
buffers.
// NOTE: the ordering of these two while-block matter,
mergeAggregationBuffer() should
// be called after calling processRow.
while (hasNextAggBuffer &&
groupingKeyOrdering.compare(initialAggBufferIterator.getKey,
groupingKey) == 0) {
Review comment:
Hi @cloud-fan, what you said here is really killing! With Spark 2.4.1 by
turning this conf off I see the DataSketches hll issue being solved: `--conf
spark.sql.execution.useObjectHashAggregateExec=false`. Which basically disable
the hash agg attempt. But as you said, this is downgrading the general
performance (not so bad when people have a sense about how big their data is).
However my question is, is it possible that it can be automatically done for
the framework to recognize Hive UDAF and only apply its way to it? e.g. For
such a query `select year, count(month), hive_udaf_count(month) from ... group
by year` run by spark-sql, I want the Spark count() to behave as the way INIT
-> UPDATE -> MERGE -> FINISH, while the hive_udaf_count() to behave as the way
INIT -> UPDATE -> FINISH, INIT -> MERGE -> FINISH **at the same time**.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]