Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10998#discussion_r51632943
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
 ---
    @@ -426,14 +510,39 @@ case class TungstenAggregate(
           ctx.updateColumn(buffer, dt, i, ev, updateExpr(i).nullable)
         }
     
    +    val countTerm = ctx.freshName("count")
    +    ctx.addMutableState("int", countTerm, s"$countTerm = 0;")
    +    val checkFallback = if (testFallbackStartsAt.isDefined) {
    +      s"$countTerm < ${testFallbackStartsAt.get}"
    +    } else {
    +      "true"
    +    }
    +
    +    // We try to do hash map based in-memory aggregation first. If there 
is not enough memory (the
    +    // hash map will return null for new key), we spill the hash map to 
disk to free memory, then
    +    // continue to do in-memory aggregation and spilling until all the 
rows had been processed.
    +    // Finally, sort the spilled aggregate buffers by key, and merge them 
together for same key.
         s"""
          // generate grouping key
          ${keyCode.code}
    -     UnsafeRow $buffer = 
$hashMapTerm.getAggregationBufferFromUnsafeRow($key);
    +     UnsafeRow $buffer = null;
    +     if ($checkFallback) {
    --- End diff --
    
    I don't get this. You seem to do a look up twice and line (539) . Is that 
intentional? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to