c21 commented on a change in pull request #32242:
URL: https://github.com/apache/spark/pull/32242#discussion_r617989928



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##########
@@ -128,6 +128,16 @@ case class HashAggregateExec(
   // all the mode of aggregate expressions
   private val modes = aggregateExpressions.map(_.mode).distinct
 
+  // This is for testing final aggregate with number-of-rows-based fall back 
as specified in
+  // `testFallbackStartsAt`. In this scenario, there might be same keys exist 
in both fast and
+  // regular hash map. So the aggregation buffers from both maps need to be 
merged together
+  // to avoid correctness issue.
+  //
+  // This scenario only happens in unit test with number-of-rows-based fall 
back.
+  // There should not be same keys in both maps with size-based fall back in 
production.
+  private val isTestFinalAggregateWithFallback: Boolean = 
testFallbackStartsAt.isDefined &&

Review comment:
       @cloud-fan - maybe I am missing something but not sure how these two 
solutions fix the problem.
   
   1. dedicated counters for two maps
   ```
   if (counter1 < 2) {
     // 1st level hash map
     agg_buffer = fastHashMap.findOrInsert(key);
   }
   if (agg_buffer == null) {
     // generated. code for key in unsafe row format
     ...
     if (counter2 < 3) {
       // 2nd level hash map
       agg_buffer = 
regularHashMap.getAggregationBufferFromUnsafeRow(key_in_unsafe_row, ...);
     }
     if (agg_buffer == null) {
       // sort-based fallback
       regularHashMap.destructAndCreateExternalSorter();
       ...
       counter2 = 0;
     }
   }
   counter1 += 1;
   counter2 += 1;
   ```
   
   Counter example:
   
   ```
   1. key_a is inserted into 1st level map (counter1 = 0)
   2. a couple of keys are inserted into 1st level map (count1 =2)
   3. key_a is inserted into 2nd level map (count1 = 2, count2 = 2)
   ```
   
   2. set 1st level map `bitMaxCapacity` to be log2(`testFallbackStartsAt._1`).
   ```
   if (counter < 2) {
     // 1st level hash map
     agg_buffer = fastHashMap.findOrInsert(key);
   }
   if (agg_buffer == null) {
     // generated. code for key in unsafe row format
     ...
     if (counter < 3) {
       // 2nd level hash map
       agg_buffer = 
regularHashMap.getAggregationBufferFromUnsafeRow(key_in_unsafe_row, ...);
     }
     if (agg_buffer == null) {
       // sort-based fallback
       regularHashMap.destructAndCreateExternalSorter();
       ...
       counter = 0;
     }
   }
   counter += 1;
   ```
   
   Counter example:
   
   ```
   1. key_a is inserted into 1st level map (counter = 0)
   2. a couple of NULL keys are inserted into 2nd level map (count = 2). Note: 
1st level map does not support NULL key.
   3. key_a is inserted into 2nd level map (count1 = 2)
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to