[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316517404 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -267,29 +302,81 @@ case class HashAggregateExec( e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions } } -ctx.currentVars = bufVars ++ input -val boundUpdateExpr = bindReferences(updateExpr, inputAttrs) -val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) -val effectiveCodes = subExprs.codes.mkString("\n") -val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { - boundUpdateExpr.map(_.genCode(ctx)) -} -// aggregate buffer should be updated atomic -val updates = aggVals.zipWithIndex.map { case (ev, i) => + +if (!conf.codegenSplitAggregateFunc) { + ctx.currentVars = bufVars ++ input + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttrs)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + // aggregate buffer should be updated atomic + val updates = aggVals.zipWithIndex.map { case (ev, i) => +s""" + | ${bufVars(i).isNull} = ${ev.isNull}; + | ${bufVars(i).value} = ${ev.value}; + """.stripMargin + } + s""" + | // do aggregate + | // common sub-expressions + | $effectiveCodes + | // evaluate aggregate function + | ${evaluateVariables(aggVals)} + | // update aggregation buffer + | ${updates.mkString("\n").trim} + """.stripMargin +} else { + // We need to copy the aggregation buffer to local variables first because each aggregate Review comment: I think I'm starting to understand ... I had a different basic assumption from your splitting logic. I started realizing that when reading your sample generated code in the PR description (BTW that sample code should be updated to match what it currently generates when you rebase this PR to the latest master) ```java /* 108 */ // copy aggregation buffer to the local /* 109 */ boolean agg_localBufIsNull = agg_bufIsNull; /* 110 */ long agg_localBufValue = agg_bufValue; /* 111 */ boolean agg_localBufIsNull1 = agg_bufIsNull1; /* 112 */ double agg_localBufValue1 = agg_bufValue1; /* 113 */ boolean agg_localBufIsNull2 = agg_bufIsNull2; /* 114 */ long agg_localBufValue2 = agg_bufValue2; /* 115 */ // common sub-expressions /* 116 */ /* 117 */ // process aggregate functions to update aggregation buffer /* 118 */ agg_doAggregateVal_coalesce(agg_localBufIsNull, agg_localBufValue, inputadapter_value, inputadapter_isNull); /* 119 */ agg_doAggregateVal_add(agg_localBufValue1, inputadapter_isNull1, inputadapter_value1, agg_localBufIsNull1); /* 120 */ agg_doAggregateVal_add1(inputadapter_isNull2, inputadapter_value2, agg_localBufIsNull2, agg_localBufValue2); ``` I thought for `SUM(a), AVG(a)` I was going to see `agg_doAggregateVal_sum` and `agg_doAggregateVal_avg`, but instead what I'm seeing here is a more fine-grained splitting for each update expression. So instead we get: - `agg_doAggregateVal_coalesce` for `sum(a)` - `agg_doAggregateVal_add` and `agg_doAggregateVal_add1` for `avg(a)` My previous comment in this thread only applies when the splitting boundary is on per-aggregate-expression granularity, instead of on per-update-expression-in-aggregate-function granularity. `kurtosis()` is pretty much the largest declarative aggregate function in Spark SQL right now. I don't think a single `kurtosis()` would go over 8000 bytes worth of bytecode, so maybe a per-aggregate-expression granularity would make more sense? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316501527 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -267,29 +302,81 @@ case class HashAggregateExec( e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions } } -ctx.currentVars = bufVars ++ input -val boundUpdateExpr = bindReferences(updateExpr, inputAttrs) -val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) -val effectiveCodes = subExprs.codes.mkString("\n") -val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { - boundUpdateExpr.map(_.genCode(ctx)) -} -// aggregate buffer should be updated atomic -val updates = aggVals.zipWithIndex.map { case (ev, i) => + +if (!conf.codegenSplitAggregateFunc) { + ctx.currentVars = bufVars ++ input + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttrs)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + // aggregate buffer should be updated atomic + val updates = aggVals.zipWithIndex.map { case (ev, i) => +s""" + | ${bufVars(i).isNull} = ${ev.isNull}; + | ${bufVars(i).value} = ${ev.value}; + """.stripMargin + } + s""" + | // do aggregate Review comment: Thanks! I'm totally fine with either cleaning it up here or in a follow-up. This is purely cosmetic and wouldn't change the core feature. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316417691 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -267,29 +302,81 @@ case class HashAggregateExec( e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions } } -ctx.currentVars = bufVars ++ input -val boundUpdateExpr = bindReferences(updateExpr, inputAttrs) -val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) -val effectiveCodes = subExprs.codes.mkString("\n") -val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { - boundUpdateExpr.map(_.genCode(ctx)) -} -// aggregate buffer should be updated atomic -val updates = aggVals.zipWithIndex.map { case (ev, i) => + +if (!conf.codegenSplitAggregateFunc) { + ctx.currentVars = bufVars ++ input + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttrs)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + // aggregate buffer should be updated atomic Review comment: s/atomic/atomically/ I know this is just the old code moved into a conditional path, but it might be nice to clean it up while we're at it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316428625 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -844,9 +931,51 @@ case class HashAggregateExec( |// update unsafe row buffer |${updateUnsafeRowBuffer.mkString("\n").trim} """.stripMargin +} else { + // We need to copy the aggregation row buffer to a local row first because each aggregate Review comment: Same question as the non-grouping version, why is it required to load the aggregate buffers upfront here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316431520 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala ## @@ -1612,6 +1612,49 @@ object CodeGenerator extends Logging { } } + /** + * Extracts all the input variables from references and subexpression elimination states + * for a given `expr`. This result will be used to split the generated code of + * expressions into multiple functions. + */ + def getLocalInputVariableValues( + context: CodegenContext, + expr: Expression, + subExprs: Map[Expression, SubExprEliminationState]): Seq[((String, DataType), Expression)] = { +val argMap = mutable.Map[(String, DataType), Expression]() +val stack = mutable.Stack[Expression](expr) +while (stack.nonEmpty) { + stack.pop() match { +case e if subExprs.contains(e) => + val exprCode = subExprs(e) + val SubExprEliminationState(isNull, value) = exprCode + if (value.isInstanceOf[VariableValue]) { Review comment: When would the `value` / `isNull` be something other than a `VariableValue`? If we don't expect it to be anything else, can we change this to an assertion instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316430359 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -255,6 +255,41 @@ case class HashAggregateExec( """.stripMargin } + // Splits aggregate code into small functions because the most of JVM implementations + // can not compile too long functions. Note that different from `CodeGenerator.splitExpressions`, + // we will extract input variables from references and subexpression elimination states + // for each aggregate expression, then pass them to it. + private def splitAggregateExpressions( + context: CodegenContext, + aggregateExpressions: Seq[Expression], + codes: Seq[String], + subExprs: Map[Expression, SubExprEliminationState], + bufferInput: Option[(String, DataType)] = None): Seq[String] = { Review comment: Same question on `(String, DataType)` being just a degenerated `VariableValue` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316429780 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -892,6 +1021,57 @@ case class HashAggregateExec( } else { updateRowInRegularHashMap } +} else { + if (isFastHashMapEnabled) { +// We need to copy the aggregation row buffer to a local row first because each aggregate Review comment: Same question about why we have to load aggregate buffers first This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316429874 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -892,6 +1021,57 @@ case class HashAggregateExec( } else { updateRowInRegularHashMap } +} else { + if (isFastHashMapEnabled) { +// We need to copy the aggregation row buffer to a local row first because each aggregate +// function directly updates the buffer when it finishes. +val localRowBuffer = ctx.freshName("localFastRowBuffer") +val initLocalRowBuffer = s"InternalRow $localRowBuffer = $fastRowBuffer.copy();" + +ctx.INPUT_ROW = localRowBuffer +val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) +val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) +val effectiveCodes = subExprs.codes.mkString("\n") +val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) { + boundUpdateExpr.map(_.genCode(ctx)) +} + +val evalAndUpdateCodes = fastRowEvals.zipWithIndex.map { case (ev, i) => + val dt = updateExpr(i).dataType + val updateColumnCode = CodeGenerator.updateColumn( +fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorizedHashMapEnabled) + s""" + | // evaluate aggregate function + | ${ev.code} + | // update fast row + | $updateColumnCode + """.stripMargin +} + +val updateAggValCode = splitAggregateExpressions( + context = ctx, + aggregateExpressions = boundUpdateExpr, + codes = evalAndUpdateCodes, + subExprs = subExprs.states, + bufferInput = Some((fastRowBuffer, ObjectType(classOf[InternalRow] + +// If fast hash map is on, we first generate code to update row in fast hash map, if the +// previous loop up hit fast hash map. Otherwise, update row in regular hash map. +s""" + |if ($fastRowBuffer != null) { Review comment: Ditto on leading space This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316414820 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -255,6 +255,41 @@ case class HashAggregateExec( """.stripMargin } + // Splits aggregate code into small functions because the most of JVM implementations + // can not compile too long functions. Note that different from `CodeGenerator.splitExpressions`, + // we will extract input variables from references and subexpression elimination states + // for each aggregate expression, then pass them to it. + private def splitAggregateExpressions( + context: CodegenContext, + aggregateExpressions: Seq[Expression], + codes: Seq[String], + subExprs: Map[Expression, SubExprEliminationState], + bufferInput: Option[(String, DataType)] = None): Seq[String] = { +aggregateExpressions.zipWithIndex.map { case (aggExpr, i) => Review comment: Looks like we can just do `aggregateExpressions.zip(codes).map { case (aggExpr, aggEvalCode) =>` here? I don't see any other use of `i` below other than `codes(i)`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316414421 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -255,6 +255,41 @@ case class HashAggregateExec( """.stripMargin } + // Splits aggregate code into small functions because the most of JVM implementations + // can not compile too long functions. Note that different from `CodeGenerator.splitExpressions`, + // we will extract input variables from references and subexpression elimination states + // for each aggregate expression, then pass them to it. + private def splitAggregateExpressions( + context: CodegenContext, + aggregateExpressions: Seq[Expression], + codes: Seq[String], Review comment: Cosmetic: `codes` is a bit vague. Can we change the name to something like `aggEvalCodes` to make it clear that it's the code for the aggregate functions? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316429963 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -844,9 +931,51 @@ case class HashAggregateExec( |// update unsafe row buffer |${updateUnsafeRowBuffer.mkString("\n").trim} """.stripMargin +} else { + // We need to copy the aggregation row buffer to a local row first because each aggregate + // function directly updates the buffer when it finishes. + val localRowBuffer = ctx.freshName("localUnsafeRowBuffer") + val initLocalRowBuffer = s"InternalRow $localRowBuffer = $unsafeRowBuffer.copy();" + + ctx.INPUT_ROW = localRowBuffer + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val unsafeRowBufferEvals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + + val evalAndUpdateCodes = unsafeRowBufferEvals.zipWithIndex.map { case (ev, i) => +val dt = updateExpr(i).dataType +val updateColumnCode = + CodeGenerator.updateColumn(unsafeRowBuffer, dt, i, ev, updateExpr(i).nullable) +s""" + | // evaluate aggregate function + | ${ev.code} + | // update unsafe row buffer + | $updateColumnCode + """.stripMargin + } + + val updateAggValCode = splitAggregateExpressions( +context = ctx, +aggregateExpressions = boundUpdateExpr, +codes = evalAndUpdateCodes, +subExprs = subExprs.states, +bufferInput = Some((unsafeRowBuffer, ObjectType(classOf[InternalRow] + + s""" + | // do aggregate + | // copy aggregation row buffer to the local + | $initLocalRowBuffer + | // common sub-expressions + | $effectiveCodes + | // process aggregate functions to update aggregation buffer Review comment: Ditto on the comment phrasing This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316417255 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -267,29 +302,81 @@ case class HashAggregateExec( e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions } } -ctx.currentVars = bufVars ++ input -val boundUpdateExpr = bindReferences(updateExpr, inputAttrs) -val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) -val effectiveCodes = subExprs.codes.mkString("\n") -val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { - boundUpdateExpr.map(_.genCode(ctx)) -} -// aggregate buffer should be updated atomic -val updates = aggVals.zipWithIndex.map { case (ev, i) => + +if (!conf.codegenSplitAggregateFunc) { + ctx.currentVars = bufVars ++ input + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttrs)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + // aggregate buffer should be updated atomic + val updates = aggVals.zipWithIndex.map { case (ev, i) => Review comment: Looks like we can do `aggVals.zip(bufVars).map { case (ev, bufVar) =>` here? Ditto about this being old code but still worth cleaning up. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316427809 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -267,29 +302,81 @@ case class HashAggregateExec( e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions } } -ctx.currentVars = bufVars ++ input -val boundUpdateExpr = bindReferences(updateExpr, inputAttrs) -val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) -val effectiveCodes = subExprs.codes.mkString("\n") -val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { - boundUpdateExpr.map(_.genCode(ctx)) -} -// aggregate buffer should be updated atomic -val updates = aggVals.zipWithIndex.map { case (ev, i) => + +if (!conf.codegenSplitAggregateFunc) { + ctx.currentVars = bufVars ++ input + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttrs)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + // aggregate buffer should be updated atomic + val updates = aggVals.zipWithIndex.map { case (ev, i) => +s""" + | ${bufVars(i).isNull} = ${ev.isNull}; + | ${bufVars(i).value} = ${ev.value}; + """.stripMargin + } + s""" + | // do aggregate + | // common sub-expressions + | $effectiveCodes + | // evaluate aggregate function + | ${evaluateVariables(aggVals)} + | // update aggregation buffer + | ${updates.mkString("\n").trim} + """.stripMargin +} else { + // We need to copy the aggregation buffer to local variables first because each aggregate Review comment: Could you please help elaborate why copying the aggregate buffer values to local variables is required here? In the current data layout design, `DeclarativeAggregate`s never share their buffer across aggregate expressions; so it never made any sense to me why in the old code we have to update all aggregate buffer slots at the end after all update expressions have been evaluated. The flow of: `Load aggregate buffer -> evaluate update expressions -> store back to aggregate buffer` should be strictly confined to each aggregate expression. The 3-step flow logically looks like this: ``` // step 1: load from aggregate buffer val localAggBufSlot1_isNull = aggBufSlot1_isNull; val localAggBufSlot1_value = aggBufSlot1_value; // step 2: evaluate and materialize update/merge expressions val isNull2 = false; val value2 = (localAggBufSlot1_isNull ? 0 : localAggBufSlot1_value) + input1; // step 3: write back to aggregate buffer aggBufSlot1_isNull = isNull2; aggBufSlot1_value = value2; ``` As you can see, evaluating the update/merge expressions won't have any side effects on the aggregate buffers -- they're evaluated to local variables first, and then stored back to aggregate buffers. So even if I simplify that to: ``` // NO step 1: load from aggregate buffer // val localAggBufSlot1_isNull = aggBufSlot1_isNull; // val localAggBufSlot1_value = aggBufSlot1_value; // step 2: evaluate and materialize update/merge expressions, directly loading from aggregate buffers val isNull2 = false; val value2 = (aggBufSlot1_isNull ? 0 : aggBufSlot1_value) + input1; // step 3: write back to aggregate buffer aggBufSlot1_isNull = isNull2; aggBufSlot1_value = value2; ``` It'll be just as safe. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316429335 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -844,9 +931,51 @@ case class HashAggregateExec( |// update unsafe row buffer |${updateUnsafeRowBuffer.mkString("\n").trim} """.stripMargin +} else { + // We need to copy the aggregation row buffer to a local row first because each aggregate + // function directly updates the buffer when it finishes. + val localRowBuffer = ctx.freshName("localUnsafeRowBuffer") + val initLocalRowBuffer = s"InternalRow $localRowBuffer = $unsafeRowBuffer.copy();" + + ctx.INPUT_ROW = localRowBuffer + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val unsafeRowBufferEvals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + + val evalAndUpdateCodes = unsafeRowBufferEvals.zipWithIndex.map { case (ev, i) => +val dt = updateExpr(i).dataType +val updateColumnCode = + CodeGenerator.updateColumn(unsafeRowBuffer, dt, i, ev, updateExpr(i).nullable) +s""" + | // evaluate aggregate function + | ${ev.code} + | // update unsafe row buffer + | $updateColumnCode + """.stripMargin + } + + val updateAggValCode = splitAggregateExpressions( +context = ctx, +aggregateExpressions = boundUpdateExpr, +codes = evalAndUpdateCodes, +subExprs = subExprs.states, +bufferInput = Some((unsafeRowBuffer, ObjectType(classOf[InternalRow] + + s""" + | // do aggregate Review comment: Ditto on leading space This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316428178 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -267,29 +302,81 @@ case class HashAggregateExec( e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions } } -ctx.currentVars = bufVars ++ input -val boundUpdateExpr = bindReferences(updateExpr, inputAttrs) -val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) -val effectiveCodes = subExprs.codes.mkString("\n") -val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { - boundUpdateExpr.map(_.genCode(ctx)) -} -// aggregate buffer should be updated atomic -val updates = aggVals.zipWithIndex.map { case (ev, i) => + +if (!conf.codegenSplitAggregateFunc) { + ctx.currentVars = bufVars ++ input + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttrs)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + // aggregate buffer should be updated atomic + val updates = aggVals.zipWithIndex.map { case (ev, i) => +s""" + | ${bufVars(i).isNull} = ${ev.isNull}; + | ${bufVars(i).value} = ${ev.value}; + """.stripMargin + } + s""" + | // do aggregate + | // common sub-expressions + | $effectiveCodes + | // evaluate aggregate function + | ${evaluateVariables(aggVals)} + | // update aggregation buffer + | ${updates.mkString("\n").trim} + """.stripMargin +} else { + // We need to copy the aggregation buffer to local variables first because each aggregate + // function directly updates the buffer when it finishes. + val localBufVars = bufVars.zip(updateExpr).map { case (ev, e) => +val isNull = ctx.freshName("localBufIsNull") +val value = ctx.freshName("localBufValue") +val initLocalVars = code""" + | boolean $isNull = ${ev.isNull}; + | ${CodeGenerator.javaType(e.dataType)} $value = ${ev.value}; + """.stripMargin +ExprCode(initLocalVars, JavaCode.isNullVariable(isNull), + JavaCode.variable(value, e.dataType)) + } + + val initLocalBufVar = evaluateVariables(localBufVars) + + ctx.currentVars = localBufVars ++ input + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttrs)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + + val evalAndUpdateCodes = aggVals.zipWithIndex.map { case (ev, i) => +s""" + | // evaluate aggregate function Review comment: Cosmetic: remove leading space after the `|` margin marker? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316428843 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -844,9 +931,51 @@ case class HashAggregateExec( |// update unsafe row buffer |${updateUnsafeRowBuffer.mkString("\n").trim} """.stripMargin +} else { + // We need to copy the aggregation row buffer to a local row first because each aggregate + // function directly updates the buffer when it finishes. + val localRowBuffer = ctx.freshName("localUnsafeRowBuffer") + val initLocalRowBuffer = s"InternalRow $localRowBuffer = $unsafeRowBuffer.copy();" + + ctx.INPUT_ROW = localRowBuffer + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val unsafeRowBufferEvals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + + val evalAndUpdateCodes = unsafeRowBufferEvals.zipWithIndex.map { case (ev, i) => +val dt = updateExpr(i).dataType +val updateColumnCode = + CodeGenerator.updateColumn(unsafeRowBuffer, dt, i, ev, updateExpr(i).nullable) +s""" + | // evaluate aggregate function Review comment: Ditto on leading space This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316429935 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -892,6 +1021,57 @@ case class HashAggregateExec( } else { updateRowInRegularHashMap } +} else { + if (isFastHashMapEnabled) { +// We need to copy the aggregation row buffer to a local row first because each aggregate +// function directly updates the buffer when it finishes. +val localRowBuffer = ctx.freshName("localFastRowBuffer") +val initLocalRowBuffer = s"InternalRow $localRowBuffer = $fastRowBuffer.copy();" + +ctx.INPUT_ROW = localRowBuffer +val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) +val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) +val effectiveCodes = subExprs.codes.mkString("\n") +val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) { + boundUpdateExpr.map(_.genCode(ctx)) +} + +val evalAndUpdateCodes = fastRowEvals.zipWithIndex.map { case (ev, i) => + val dt = updateExpr(i).dataType + val updateColumnCode = CodeGenerator.updateColumn( +fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorizedHashMapEnabled) + s""" + | // evaluate aggregate function + | ${ev.code} + | // update fast row + | $updateColumnCode + """.stripMargin +} + +val updateAggValCode = splitAggregateExpressions( + context = ctx, + aggregateExpressions = boundUpdateExpr, + codes = evalAndUpdateCodes, + subExprs = subExprs.states, + bufferInput = Some((fastRowBuffer, ObjectType(classOf[InternalRow] + +// If fast hash map is on, we first generate code to update row in fast hash map, if the +// previous loop up hit fast hash map. Otherwise, update row in regular hash map. +s""" + |if ($fastRowBuffer != null) { + | // copy aggregation row buffer to the local + | $initLocalRowBuffer + | // common sub-expressions + | $effectiveCodes + | // process aggregate functions to update aggregation buffer Review comment: Ditto on the comment phrasing This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316429850 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -892,6 +1021,57 @@ case class HashAggregateExec( } else { updateRowInRegularHashMap } +} else { + if (isFastHashMapEnabled) { +// We need to copy the aggregation row buffer to a local row first because each aggregate +// function directly updates the buffer when it finishes. +val localRowBuffer = ctx.freshName("localFastRowBuffer") +val initLocalRowBuffer = s"InternalRow $localRowBuffer = $fastRowBuffer.copy();" + +ctx.INPUT_ROW = localRowBuffer +val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) +val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) +val effectiveCodes = subExprs.codes.mkString("\n") +val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) { + boundUpdateExpr.map(_.genCode(ctx)) +} + +val evalAndUpdateCodes = fastRowEvals.zipWithIndex.map { case (ev, i) => + val dt = updateExpr(i).dataType + val updateColumnCode = CodeGenerator.updateColumn( +fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorizedHashMapEnabled) + s""" + | // evaluate aggregate function Review comment: Ditto on leading space This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316413544 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -255,6 +255,41 @@ case class HashAggregateExec( """.stripMargin } + // Splits aggregate code into small functions because the most of JVM implementations + // can not compile too long functions. Note that different from `CodeGenerator.splitExpressions`, + // we will extract input variables from references and subexpression elimination states + // for each aggregate expression, then pass them to it. + private def splitAggregateExpressions( + context: CodegenContext, Review comment: Cosmetic: all other places that refer to a `CodegenContext` spells it as `ctx`. Maybe worth making the naming consistent here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316431020 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala ## @@ -1612,6 +1612,49 @@ object CodeGenerator extends Logging { } } + /** + * Extracts all the input variables from references and subexpression elimination states + * for a given `expr`. This result will be used to split the generated code of + * expressions into multiple functions. + */ + def getLocalInputVariableValues( + context: CodegenContext, + expr: Expression, + subExprs: Map[Expression, SubExprEliminationState]): Seq[((String, DataType), Expression)] = { +val argMap = mutable.Map[(String, DataType), Expression]() +val stack = mutable.Stack[Expression](expr) +while (stack.nonEmpty) { + stack.pop() match { +case e if subExprs.contains(e) => + val exprCode = subExprs(e) + val SubExprEliminationState(isNull, value) = exprCode Review comment: Can we combine this line and the line above to `val SubExprEliminationState(isNull, value) = subExprs(e)`? The `exprCode` can be a bit confusing (the type of this local variable is not `ExprCode`) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316428090 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -267,29 +302,81 @@ case class HashAggregateExec( e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions } } -ctx.currentVars = bufVars ++ input -val boundUpdateExpr = bindReferences(updateExpr, inputAttrs) -val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) -val effectiveCodes = subExprs.codes.mkString("\n") -val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { - boundUpdateExpr.map(_.genCode(ctx)) -} -// aggregate buffer should be updated atomic -val updates = aggVals.zipWithIndex.map { case (ev, i) => + +if (!conf.codegenSplitAggregateFunc) { + ctx.currentVars = bufVars ++ input + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttrs)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + // aggregate buffer should be updated atomic + val updates = aggVals.zipWithIndex.map { case (ev, i) => +s""" + | ${bufVars(i).isNull} = ${ev.isNull}; + | ${bufVars(i).value} = ${ev.value}; + """.stripMargin + } + s""" + | // do aggregate + | // common sub-expressions + | $effectiveCodes + | // evaluate aggregate function + | ${evaluateVariables(aggVals)} + | // update aggregation buffer + | ${updates.mkString("\n").trim} + """.stripMargin +} else { + // We need to copy the aggregation buffer to local variables first because each aggregate + // function directly updates the buffer when it finishes. + val localBufVars = bufVars.zip(updateExpr).map { case (ev, e) => +val isNull = ctx.freshName("localBufIsNull") +val value = ctx.freshName("localBufValue") +val initLocalVars = code""" + | boolean $isNull = ${ev.isNull}; + | ${CodeGenerator.javaType(e.dataType)} $value = ${ev.value}; + """.stripMargin +ExprCode(initLocalVars, JavaCode.isNullVariable(isNull), + JavaCode.variable(value, e.dataType)) + } + + val initLocalBufVar = evaluateVariables(localBufVars) + + ctx.currentVars = localBufVars ++ input + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttrs)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + + val evalAndUpdateCodes = aggVals.zipWithIndex.map { case (ev, i) => Review comment: Again, change to `aggVals.zip(bufVars).map { case (ev, bufVar) =>` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316428382 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -267,29 +302,81 @@ case class HashAggregateExec( e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions } } -ctx.currentVars = bufVars ++ input -val boundUpdateExpr = bindReferences(updateExpr, inputAttrs) -val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) -val effectiveCodes = subExprs.codes.mkString("\n") -val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { - boundUpdateExpr.map(_.genCode(ctx)) -} -// aggregate buffer should be updated atomic -val updates = aggVals.zipWithIndex.map { case (ev, i) => + +if (!conf.codegenSplitAggregateFunc) { + ctx.currentVars = bufVars ++ input + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttrs)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + // aggregate buffer should be updated atomic + val updates = aggVals.zipWithIndex.map { case (ev, i) => +s""" + | ${bufVars(i).isNull} = ${ev.isNull}; + | ${bufVars(i).value} = ${ev.value}; + """.stripMargin + } + s""" + | // do aggregate + | // common sub-expressions + | $effectiveCodes + | // evaluate aggregate function + | ${evaluateVariables(aggVals)} + | // update aggregation buffer + | ${updates.mkString("\n").trim} + """.stripMargin +} else { + // We need to copy the aggregation buffer to local variables first because each aggregate + // function directly updates the buffer when it finishes. + val localBufVars = bufVars.zip(updateExpr).map { case (ev, e) => +val isNull = ctx.freshName("localBufIsNull") +val value = ctx.freshName("localBufValue") +val initLocalVars = code""" + | boolean $isNull = ${ev.isNull}; + | ${CodeGenerator.javaType(e.dataType)} $value = ${ev.value}; + """.stripMargin +ExprCode(initLocalVars, JavaCode.isNullVariable(isNull), + JavaCode.variable(value, e.dataType)) + } + + val initLocalBufVar = evaluateVariables(localBufVars) + + ctx.currentVars = localBufVars ++ input + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttrs)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + + val evalAndUpdateCodes = aggVals.zipWithIndex.map { case (ev, i) => +s""" + | // evaluate aggregate function + | ${ev.code} + | // update aggregation buffer + | ${bufVars(i).isNull} = ${ev.isNull}; + | ${bufVars(i).value} = ${ev.value}; + """.stripMargin + } + + val updateAggValCode = splitAggregateExpressions( +context = ctx, +aggregateExpressions = boundUpdateExpr, +codes = evalAndUpdateCodes, +subExprs = subExprs.states) + s""" - | ${bufVars(i).isNull} = ${ev.isNull}; - | ${bufVars(i).value} = ${ev.value}; + | // do aggregate + | // copy aggregation buffer to the local + | $initLocalBufVar + | // common sub-expressions + | $effectiveCodes + | // process aggregate functions to update aggregation buffer Review comment: "evaluate aggregate functions and update aggregation buffers"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316418406 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -267,29 +302,81 @@ case class HashAggregateExec( e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions } } -ctx.currentVars = bufVars ++ input -val boundUpdateExpr = bindReferences(updateExpr, inputAttrs) -val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) -val effectiveCodes = subExprs.codes.mkString("\n") -val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { - boundUpdateExpr.map(_.genCode(ctx)) -} -// aggregate buffer should be updated atomic -val updates = aggVals.zipWithIndex.map { case (ev, i) => + +if (!conf.codegenSplitAggregateFunc) { Review comment: Cosmetic: if we're planning to turn this on by default, code style-wise it makes more sense to me to have `if (conf.codegenSplitAggregateFunc)` go first. The visual cue of the bang (`!`) is much less striking than the long conf name... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316430732 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala ## @@ -1612,6 +1612,49 @@ object CodeGenerator extends Logging { } } + /** + * Extracts all the input variables from references and subexpression elimination states + * for a given `expr`. This result will be used to split the generated code of + * expressions into multiple functions. + */ + def getLocalInputVariableValues( + context: CodegenContext, + expr: Expression, + subExprs: Map[Expression, SubExprEliminationState]): Seq[((String, DataType), Expression)] = { +val argMap = mutable.Map[(String, DataType), Expression]() +val stack = mutable.Stack[Expression](expr) Review comment: Question: so the stack here is only used for implementing depth-first pre-order traversal of an expression tree in an iterative style instead of recursive style, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316430268 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala ## @@ -1612,6 +1612,49 @@ object CodeGenerator extends Logging { } } + /** + * Extracts all the input variables from references and subexpression elimination states + * for a given `expr`. This result will be used to split the generated code of + * expressions into multiple functions. + */ + def getLocalInputVariableValues( + context: CodegenContext, + expr: Expression, + subExprs: Map[Expression, SubExprEliminationState]): Seq[((String, DataType), Expression)] = { Review comment: `(String, DataType)` looks like a degenerated `VariableValue`. Can we just use the `VariableValue` type here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions URL: https://github.com/apache/spark/pull/20965#discussion_r316418729 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ## @@ -267,29 +302,81 @@ case class HashAggregateExec( e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions } } -ctx.currentVars = bufVars ++ input -val boundUpdateExpr = bindReferences(updateExpr, inputAttrs) -val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) -val effectiveCodes = subExprs.codes.mkString("\n") -val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { - boundUpdateExpr.map(_.genCode(ctx)) -} -// aggregate buffer should be updated atomic -val updates = aggVals.zipWithIndex.map { case (ev, i) => + +if (!conf.codegenSplitAggregateFunc) { + ctx.currentVars = bufVars ++ input + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttrs)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + // aggregate buffer should be updated atomic + val updates = aggVals.zipWithIndex.map { case (ev, i) => +s""" + | ${bufVars(i).isNull} = ${ev.isNull}; + | ${bufVars(i).value} = ${ev.value}; + """.stripMargin + } + s""" + | // do aggregate Review comment: I was always curious about the leading space after the `|` margin marker in these codegen templates... Ditto about this being code code but still worth clean ing up. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org