Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19811#discussion_r157798448
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
---
@@ -583,42 +579,41 @@ case class HashAggregateExec(
val thisPlan = ctx.addReferenceObj("plan", this)
// Create a name for the iterator from the fast hash map.
- val iterTermForFastHashMap = ctx.freshName("fastHashMapIter")
- if (isFastHashMapEnabled) {
+ val iterTermForFastHashMap = if (isFastHashMapEnabled) {
// Generates the fast hash map class and creates the fash hash map
term.
- fastHashMapTerm = ctx.freshName("fastHashMap")
val fastHashMapClassName = ctx.freshName("FastHashMap")
if (isVectorizedHashMapEnabled) {
val generatedMap = new VectorizedHashMapGenerator(ctx,
aggregateExpressions,
fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
ctx.addInnerClass(generatedMap)
- ctx.addMutableState(fastHashMapClassName, fastHashMapTerm,
- s"$fastHashMapTerm = new $fastHashMapClassName();")
- ctx.addMutableState(s"java.util.Iterator<InternalRow>",
iterTermForFastHashMap)
+ fastHashMapTerm = ctx.addMutableState(fastHashMapClassName,
"vectorizedHastHashMap",
+ v => s"$v = new $fastHashMapClassName();")
+ ctx.addMutableState(s"java.util.Iterator<InternalRow>",
"vectorizedFastHashMapIter")
} else {
val generatedMap = new RowBasedHashMapGenerator(ctx,
aggregateExpressions,
fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
ctx.addInnerClass(generatedMap)
- ctx.addMutableState(fastHashMapClassName, fastHashMapTerm,
- s"$fastHashMapTerm = new $fastHashMapClassName(" +
+ fastHashMapTerm = ctx.addMutableState(fastHashMapClassName,
"fastHashMap",
+ v => s"$v = new $fastHashMapClassName(" +
s"$thisPlan.getTaskMemoryManager(),
$thisPlan.getEmptyAggregationBuffer());")
ctx.addMutableState(
"org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow>",
- iterTermForFastHashMap)
+ "fastHashMapIter")
}
}
// Create a name for the iterator from the regular hash map.
- val iterTerm = ctx.freshName("mapIter")
- ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName,
iterTerm)
+ // inline mutable state since not many aggregation operations in a task
+ val iterTerm = ctx.addMutableState(classOf[KVIterator[UnsafeRow,
UnsafeRow]].getName,
+ "mapIter", forceInline = true)
// create hashMap
- hashMapTerm = ctx.freshName("hashMap")
val hashMapClassName = classOf[UnsafeFixedWidthAggregationMap].getName
- ctx.addMutableState(hashMapClassName, hashMapTerm, s"$hashMapTerm =
$thisPlan.createHashMap();")
- sorterTerm = ctx.freshName("sorter")
- ctx.addMutableState(classOf[UnsafeKVExternalSorter].getName,
sorterTerm)
+ hashMapTerm = ctx.addMutableState(hashMapClassName, "hashMap",
+ v => s"$v = $thisPlan.createHashMap();")
--- End diff --
ditto
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]