Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19324#discussion_r141034217
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
---
@@ -654,18 +680,23 @@ case class HashAggregateExec(
val row = ctx.freshName("fastHashMapRow")
ctx.currentVars = null
ctx.INPUT_ROW = row
- var schema: StructType = groupingKeySchema
- bufferSchema.foreach(i => schema = schema.add(i))
- val generateRow = GenerateUnsafeProjection.createCode(ctx,
schema.toAttributes.zipWithIndex
- .map { case (attr, i) => BoundReference(i, attr.dataType,
attr.nullable) })
+ val generateKeyRow = GenerateUnsafeProjection.createCode(ctx,
+ groupingKeySchema.toAttributes.zipWithIndex
+ .map { case (attr, i) => BoundReference(i, attr.dataType,
attr.nullable) }
+ )
+ val generateBufferRow = GenerateUnsafeProjection.createCode(ctx,
+ bufferSchema.toAttributes.zipWithIndex
+ .map { case (attr, i) =>
+ BoundReference(groupingKeySchema.length + i, attr.dataType,
attr.nullable) })
s"""
| while ($iterTermForFastHashMap.hasNext()) {
| $numOutput.add(1);
| org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row
$row =
|
(org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row)
| $iterTermForFastHashMap.next();
- | ${generateRow.code}
- | ${consume(ctx, Seq.empty, {generateRow.value})}
+ | ${generateKeyRow.code}
+ | ${generateBufferRow.code}
+ | $outputFunc(${generateKeyRow.value},
${generateBufferRow.value});
--- End diff --
we didn't call `outputCode` before, are you fixing a potential bug?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]