Github user maropu commented on a diff in the pull request:
https://github.com/apache/spark/pull/21860#discussion_r212854595
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
---
@@ -853,33 +853,47 @@ case class HashAggregateExec(
val updateRowInHashMap: String = {
if (isFastHashMapEnabled) {
- ctx.INPUT_ROW = fastRowBuffer
- val boundUpdateExpr =
updateExpr.map(BindReferences.bindReference(_, inputAttr))
- val subExprs =
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
- val effectiveCodes = subExprs.codes.mkString("\n")
- val fastRowEvals =
ctx.withSubExprEliminationExprs(subExprs.states) {
- boundUpdateExpr.map(_.genCode(ctx))
- }
- val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) =>
- val dt = updateExpr(i).dataType
- CodeGenerator.updateColumn(
- fastRowBuffer, dt, i, ev, updateExpr(i).nullable,
isVectorizedHashMapEnabled)
- }
+ if (isVectorizedHashMapEnabled) {
+ ctx.INPUT_ROW = fastRowBuffer
+ val boundUpdateExpr =
updateExpr.map(BindReferences.bindReference(_, inputAttr))
+ val subExprs =
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+ val effectiveCodes = subExprs.codes.mkString("\n")
+ val fastRowEvals =
ctx.withSubExprEliminationExprs(subExprs.states) {
+ boundUpdateExpr.map(_.genCode(ctx))
+ }
+ val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i)
=>
+ val dt = updateExpr(i).dataType
+ CodeGenerator.updateColumn(
+ fastRowBuffer, dt, i, ev, updateExpr(i).nullable,
isVectorizedHashMapEnabled)
--- End diff --
`fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorized = true)`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]