c21 commented on a change in pull request #32242:
URL: https://github.com/apache/spark/pull/32242#discussion_r616450151
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##########
@@ -128,6 +128,16 @@ case class HashAggregateExec(
// all the mode of aggregate expressions
private val modes = aggregateExpressions.map(_.mode).distinct
+ // This is for testing final aggregate with number-of-rows-based fall back
as specified in
+ // `testFallbackStartsAt`. In this scenario, there might be same keys exist
in both fast and
+ // regular hash map. So the aggregation buffers from both maps need to be
merged together
+ // to avoid correctness issue.
+ //
+ // This scenario only happens in unit test with number-of-rows-based fall
back.
+ // There should not be same keys in both maps with size-based fall back in
production.
+ private val isTestFinalAggregateWithFallback: Boolean =
testFallbackStartsAt.isDefined &&
Review comment:
@cloud-fan - I was thinking the same way too. I found it's quite hard to
fix the fallback logic. I tried the approach to add a find(key): Boolean method
in generated first level map, and to first check if key already exists in first
level map. But I found other case like the key can be put into second level
map, later added to first level map as well (fallback row counter reset to 0
case).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]