Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19811#discussion_r157798448
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
    @@ -583,42 +579,41 @@ case class HashAggregateExec(
         val thisPlan = ctx.addReferenceObj("plan", this)
     
         // Create a name for the iterator from the fast hash map.
    -    val iterTermForFastHashMap = ctx.freshName("fastHashMapIter")
    -    if (isFastHashMapEnabled) {
    +    val iterTermForFastHashMap = if (isFastHashMapEnabled) {
           // Generates the fast hash map class and creates the fash hash map 
term.
    -      fastHashMapTerm = ctx.freshName("fastHashMap")
           val fastHashMapClassName = ctx.freshName("FastHashMap")
           if (isVectorizedHashMapEnabled) {
             val generatedMap = new VectorizedHashMapGenerator(ctx, 
aggregateExpressions,
               fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
             ctx.addInnerClass(generatedMap)
     
    -        ctx.addMutableState(fastHashMapClassName, fastHashMapTerm,
    -          s"$fastHashMapTerm = new $fastHashMapClassName();")
    -        ctx.addMutableState(s"java.util.Iterator<InternalRow>", 
iterTermForFastHashMap)
    +        fastHashMapTerm = ctx.addMutableState(fastHashMapClassName, 
"vectorizedHastHashMap",
    +          v => s"$v = new $fastHashMapClassName();")
    +        ctx.addMutableState(s"java.util.Iterator<InternalRow>", 
"vectorizedFastHashMapIter")
           } else {
             val generatedMap = new RowBasedHashMapGenerator(ctx, 
aggregateExpressions,
               fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
             ctx.addInnerClass(generatedMap)
     
    -        ctx.addMutableState(fastHashMapClassName, fastHashMapTerm,
    -          s"$fastHashMapTerm = new $fastHashMapClassName(" +
    +        fastHashMapTerm = ctx.addMutableState(fastHashMapClassName, 
"fastHashMap",
    +          v => s"$v = new $fastHashMapClassName(" +
                 s"$thisPlan.getTaskMemoryManager(), 
$thisPlan.getEmptyAggregationBuffer());")
             ctx.addMutableState(
               "org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow>",
    -          iterTermForFastHashMap)
    +          "fastHashMapIter")
           }
         }
     
         // Create a name for the iterator from the regular hash map.
    -    val iterTerm = ctx.freshName("mapIter")
    -    ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, 
iterTerm)
    +    // inline mutable state since not many aggregation operations in a task
    +    val iterTerm = ctx.addMutableState(classOf[KVIterator[UnsafeRow, 
UnsafeRow]].getName,
    +      "mapIter", forceInline = true)
         // create hashMap
    -    hashMapTerm = ctx.freshName("hashMap")
         val hashMapClassName = classOf[UnsafeFixedWidthAggregationMap].getName
    -    ctx.addMutableState(hashMapClassName, hashMapTerm, s"$hashMapTerm = 
$thisPlan.createHashMap();")
    -    sorterTerm = ctx.freshName("sorter")
    -    ctx.addMutableState(classOf[UnsafeKVExternalSorter].getName, 
sorterTerm)
    +    hashMapTerm = ctx.addMutableState(hashMapClassName, "hashMap",
    +      v => s"$v = $thisPlan.createHashMap();")
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to