Github user kiszk commented on a diff in the pull request:
https://github.com/apache/spark/pull/21860#discussion_r215907441
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
---
@@ -853,33 +861,50 @@ case class HashAggregateExec(
val updateRowInHashMap: String = {
if (isFastHashMapEnabled) {
- ctx.INPUT_ROW = fastRowBuffer
- val boundUpdateExpr =
updateExpr.map(BindReferences.bindReference(_, inputAttr))
- val subExprs =
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
- val effectiveCodes = subExprs.codes.mkString("\n")
- val fastRowEvals =
ctx.withSubExprEliminationExprs(subExprs.states) {
- boundUpdateExpr.map(_.genCode(ctx))
- }
- val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) =>
- val dt = updateExpr(i).dataType
- CodeGenerator.updateColumn(
- fastRowBuffer, dt, i, ev, updateExpr(i).nullable,
isVectorizedHashMapEnabled)
- }
+ if (isVectorizedHashMapEnabled) {
+ ctx.INPUT_ROW = fastRowBuffer
+ val boundUpdateExpr =
updateExpr.map(BindReferences.bindReference(_, inputAttr))
+ val subExprs =
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+ val effectiveCodes = subExprs.codes.mkString("\n")
+ val fastRowEvals =
ctx.withSubExprEliminationExprs(subExprs.states) {
+ boundUpdateExpr.map(_.genCode(ctx))
+ }
+ val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i)
=>
+ val dt = updateExpr(i).dataType
+ CodeGenerator.updateColumn(
+ fastRowBuffer, dt, i, ev, updateExpr(i).nullable,
isVectorized = true)
+ }
- // If fast hash map is on, we first generate code to update row in
fast hash map, if the
- // previous loop up hit fast hash map. Otherwise, update row in
regular hash map.
- s"""
- |if ($fastRowBuffer != null) {
- | // common sub-expressions
- | $effectiveCodes
- | // evaluate aggregate function
- | ${evaluateVariables(fastRowEvals)}
- | // update fast row
- | ${updateFastRow.mkString("\n").trim}
- |} else {
- | $updateRowInRegularHashMap
- |}
- """.stripMargin
+ // If vectorized fast hash map is on, we first generate code to
update row
+ // in vectorized fast hash map, if the previous loop up hit
vectorized fast hash map.
+ // Otherwise, update row in regular hash map.
+ s"""
+ |if ($fastRowBuffer != null) {
+ | // common sub-expressions
+ | $effectiveCodes
+ | // evaluate aggregate function
+ | ${evaluateVariables(fastRowEvals)}
+ | // update fast row
+ | ${updateFastRow.mkString("\n").trim}
+ |} else {
+ | $updateRowInRegularHashMap
+ |}
+ """.stripMargin
+ } else {
+ // If row-based hash map is on and the previous loop up hit fast
hash map,
+ // we reuse regular hash buffer to update row of fast hash map.
+ // Otherwise, update row in regular hash map.
+ s"""
+ |// Updates the proper row buffer
+ |UnsafeRow $aggBuffer = null;
--- End diff --
nit: Is it better to use `updateAggBuffer` instead of `aggBuffer` at L899,
901, and 903 for ease of understanding?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]