[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-22 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316517404
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -267,29 +302,81 @@ case class HashAggregateExec(
   
e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
   }
 }
-ctx.currentVars = bufVars ++ input
-val boundUpdateExpr = bindReferences(updateExpr, inputAttrs)
-val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
-val effectiveCodes = subExprs.codes.mkString("\n")
-val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
-  boundUpdateExpr.map(_.genCode(ctx))
-}
-// aggregate buffer should be updated atomic
-val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+
+if (!conf.codegenSplitAggregateFunc) {
+  ctx.currentVars = bufVars ++ input
+  val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttrs))
+  val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+  val effectiveCodes = subExprs.codes.mkString("\n")
+  val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
+boundUpdateExpr.map(_.genCode(ctx))
+  }
+  // aggregate buffer should be updated atomic
+  val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+s"""
+   | ${bufVars(i).isNull} = ${ev.isNull};
+   | ${bufVars(i).value} = ${ev.value};
+   """.stripMargin
+  }
+  s"""
+ | // do aggregate
+ | // common sub-expressions
+ | $effectiveCodes
+ | // evaluate aggregate function
+ | ${evaluateVariables(aggVals)}
+ | // update aggregation buffer
+ | ${updates.mkString("\n").trim}
+ """.stripMargin
+} else {
+  // We need to copy the aggregation buffer to local variables first 
because each aggregate
 
 Review comment:
   I think I'm starting to understand ... I had a different basic assumption 
from your splitting logic.
   
   I started realizing that when reading your sample generated code in the PR 
description (BTW that sample code should be updated to match what it currently 
generates when you rebase this PR to the latest master)
   ```java
   /* 108 */   // copy aggregation buffer to the local
   /* 109 */   boolean agg_localBufIsNull = agg_bufIsNull;
   /* 110 */   long agg_localBufValue = agg_bufValue;
   /* 111 */   boolean agg_localBufIsNull1 = agg_bufIsNull1;
   /* 112 */   double agg_localBufValue1 = agg_bufValue1;
   /* 113 */   boolean agg_localBufIsNull2 = agg_bufIsNull2;
   /* 114 */   long agg_localBufValue2 = agg_bufValue2;
   /* 115 */   // common sub-expressions
   /* 116 */
   /* 117 */   // process aggregate functions to update aggregation buffer
   /* 118 */   agg_doAggregateVal_coalesce(agg_localBufIsNull, 
agg_localBufValue, inputadapter_value, inputadapter_isNull);
   /* 119 */   agg_doAggregateVal_add(agg_localBufValue1, 
inputadapter_isNull1, inputadapter_value1, agg_localBufIsNull1);
   /* 120 */   agg_doAggregateVal_add1(inputadapter_isNull2, 
inputadapter_value2, agg_localBufIsNull2, agg_localBufValue2);
   ```
   I thought for `SUM(a), AVG(a)` I was going to see `agg_doAggregateVal_sum` 
and `agg_doAggregateVal_avg`, but instead what I'm seeing here is a more 
fine-grained splitting for each update expression. So instead we get:
   - `agg_doAggregateVal_coalesce` for `sum(a)`
   - `agg_doAggregateVal_add` and `agg_doAggregateVal_add1` for `avg(a)`
   
   My previous comment in this thread only applies when the splitting boundary 
is on per-aggregate-expression granularity, instead of on 
per-update-expression-in-aggregate-function granularity.
   
   `kurtosis()` is pretty much the largest declarative aggregate function in 
Spark SQL right now. I don't think a single `kurtosis()` would go over 8000 
bytes worth of bytecode, so maybe a per-aggregate-expression granularity would 
make more sense?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316501527
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -267,29 +302,81 @@ case class HashAggregateExec(
   
e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
   }
 }
-ctx.currentVars = bufVars ++ input
-val boundUpdateExpr = bindReferences(updateExpr, inputAttrs)
-val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
-val effectiveCodes = subExprs.codes.mkString("\n")
-val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
-  boundUpdateExpr.map(_.genCode(ctx))
-}
-// aggregate buffer should be updated atomic
-val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+
+if (!conf.codegenSplitAggregateFunc) {
+  ctx.currentVars = bufVars ++ input
+  val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttrs))
+  val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+  val effectiveCodes = subExprs.codes.mkString("\n")
+  val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
+boundUpdateExpr.map(_.genCode(ctx))
+  }
+  // aggregate buffer should be updated atomic
+  val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+s"""
+   | ${bufVars(i).isNull} = ${ev.isNull};
+   | ${bufVars(i).value} = ${ev.value};
+   """.stripMargin
+  }
+  s"""
+ | // do aggregate
 
 Review comment:
   Thanks! I'm totally fine with either cleaning it up here or in a follow-up. 
This is purely cosmetic and wouldn't change the core feature.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316417691
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -267,29 +302,81 @@ case class HashAggregateExec(
   
e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
   }
 }
-ctx.currentVars = bufVars ++ input
-val boundUpdateExpr = bindReferences(updateExpr, inputAttrs)
-val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
-val effectiveCodes = subExprs.codes.mkString("\n")
-val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
-  boundUpdateExpr.map(_.genCode(ctx))
-}
-// aggregate buffer should be updated atomic
-val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+
+if (!conf.codegenSplitAggregateFunc) {
+  ctx.currentVars = bufVars ++ input
+  val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttrs))
+  val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+  val effectiveCodes = subExprs.codes.mkString("\n")
+  val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
+boundUpdateExpr.map(_.genCode(ctx))
+  }
+  // aggregate buffer should be updated atomic
 
 Review comment:
   s/atomic/atomically/
   
   I know this is just the old code moved into a conditional path, but it might 
be nice to clean it up while we're at it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316428625
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -844,9 +931,51 @@ case class HashAggregateExec(
  |// update unsafe row buffer
  |${updateUnsafeRowBuffer.mkString("\n").trim}
""".stripMargin
+} else {
+  // We need to copy the aggregation row buffer to a local row first 
because each aggregate
 
 Review comment:
   Same question as the non-grouping version, why is it required to load the 
aggregate buffers upfront here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316431520
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ##
 @@ -1612,6 +1612,49 @@ object CodeGenerator extends Logging {
 }
   }
 
+  /**
+   * Extracts all the input variables from references and subexpression 
elimination states
+   * for a given `expr`. This result will be used to split the generated code 
of
+   * expressions into multiple functions.
+   */
+  def getLocalInputVariableValues(
+  context: CodegenContext,
+  expr: Expression,
+  subExprs: Map[Expression, SubExprEliminationState]): Seq[((String, 
DataType), Expression)] = {
+val argMap = mutable.Map[(String, DataType), Expression]()
+val stack = mutable.Stack[Expression](expr)
+while (stack.nonEmpty) {
+  stack.pop() match {
+case e if subExprs.contains(e) =>
+  val exprCode = subExprs(e)
+  val SubExprEliminationState(isNull, value) = exprCode
+  if (value.isInstanceOf[VariableValue]) {
 
 Review comment:
   When would the `value` / `isNull` be something other than a `VariableValue`? 
If we don't expect it to be anything else, can we change this to an assertion 
instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316430359
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -255,6 +255,41 @@ case class HashAggregateExec(
  """.stripMargin
   }
 
+  // Splits aggregate code into small functions because the most of JVM 
implementations
+  // can not compile too long functions. Note that different from 
`CodeGenerator.splitExpressions`,
+  // we will extract input variables from references and subexpression 
elimination states
+  // for each aggregate expression, then pass them to it.
+  private def splitAggregateExpressions(
+  context: CodegenContext,
+  aggregateExpressions: Seq[Expression],
+  codes: Seq[String],
+  subExprs: Map[Expression, SubExprEliminationState],
+  bufferInput: Option[(String, DataType)] = None): Seq[String] = {
 
 Review comment:
   Same question on `(String, DataType)` being just a degenerated 
`VariableValue`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316429780
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -892,6 +1021,57 @@ case class HashAggregateExec(
   } else {
 updateRowInRegularHashMap
   }
+} else {
+  if (isFastHashMapEnabled) {
+// We need to copy the aggregation row buffer to a local row first 
because each aggregate
 
 Review comment:
   Same question about why we have to load aggregate buffers first


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316429874
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -892,6 +1021,57 @@ case class HashAggregateExec(
   } else {
 updateRowInRegularHashMap
   }
+} else {
+  if (isFastHashMapEnabled) {
+// We need to copy the aggregation row buffer to a local row first 
because each aggregate
+// function directly updates the buffer when it finishes.
+val localRowBuffer = ctx.freshName("localFastRowBuffer")
+val initLocalRowBuffer = s"InternalRow $localRowBuffer = 
$fastRowBuffer.copy();"
+
+ctx.INPUT_ROW = localRowBuffer
+val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttr))
+val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+val effectiveCodes = subExprs.codes.mkString("\n")
+val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) {
+  boundUpdateExpr.map(_.genCode(ctx))
+}
+
+val evalAndUpdateCodes = fastRowEvals.zipWithIndex.map { case (ev, i) 
=>
+  val dt = updateExpr(i).dataType
+  val updateColumnCode = CodeGenerator.updateColumn(
+fastRowBuffer, dt, i, ev, updateExpr(i).nullable, 
isVectorizedHashMapEnabled)
+  s"""
+ | // evaluate aggregate function
+ | ${ev.code}
+ | // update fast row
+ | $updateColumnCode
+   """.stripMargin
+}
+
+val updateAggValCode = splitAggregateExpressions(
+  context = ctx,
+  aggregateExpressions = boundUpdateExpr,
+  codes = evalAndUpdateCodes,
+  subExprs = subExprs.states,
+  bufferInput = Some((fastRowBuffer, 
ObjectType(classOf[InternalRow]
+
+// If fast hash map is on, we first generate code to update row in 
fast hash map, if the
+// previous loop up hit fast hash map. Otherwise, update row in 
regular hash map.
+s"""
+   |if ($fastRowBuffer != null) {
 
 Review comment:
   Ditto on leading space


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316414820
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -255,6 +255,41 @@ case class HashAggregateExec(
  """.stripMargin
   }
 
+  // Splits aggregate code into small functions because the most of JVM 
implementations
+  // can not compile too long functions. Note that different from 
`CodeGenerator.splitExpressions`,
+  // we will extract input variables from references and subexpression 
elimination states
+  // for each aggregate expression, then pass them to it.
+  private def splitAggregateExpressions(
+  context: CodegenContext,
+  aggregateExpressions: Seq[Expression],
+  codes: Seq[String],
+  subExprs: Map[Expression, SubExprEliminationState],
+  bufferInput: Option[(String, DataType)] = None): Seq[String] = {
+aggregateExpressions.zipWithIndex.map { case (aggExpr, i) =>
 
 Review comment:
   Looks like we can just do `aggregateExpressions.zip(codes).map { case 
(aggExpr, aggEvalCode) =>` here? I don't see any other use of `i` below other 
than `codes(i)`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316414421
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -255,6 +255,41 @@ case class HashAggregateExec(
  """.stripMargin
   }
 
+  // Splits aggregate code into small functions because the most of JVM 
implementations
+  // can not compile too long functions. Note that different from 
`CodeGenerator.splitExpressions`,
+  // we will extract input variables from references and subexpression 
elimination states
+  // for each aggregate expression, then pass them to it.
+  private def splitAggregateExpressions(
+  context: CodegenContext,
+  aggregateExpressions: Seq[Expression],
+  codes: Seq[String],
 
 Review comment:
   Cosmetic: `codes` is a bit vague. Can we change the name to something like 
`aggEvalCodes` to make it clear that it's the code for the aggregate functions?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316429963
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -844,9 +931,51 @@ case class HashAggregateExec(
  |// update unsafe row buffer
  |${updateUnsafeRowBuffer.mkString("\n").trim}
""".stripMargin
+} else {
+  // We need to copy the aggregation row buffer to a local row first 
because each aggregate
+  // function directly updates the buffer when it finishes.
+  val localRowBuffer = ctx.freshName("localUnsafeRowBuffer")
+  val initLocalRowBuffer = s"InternalRow $localRowBuffer = 
$unsafeRowBuffer.copy();"
+
+  ctx.INPUT_ROW = localRowBuffer
+  val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttr))
+  val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+  val effectiveCodes = subExprs.codes.mkString("\n")
+  val unsafeRowBufferEvals = 
ctx.withSubExprEliminationExprs(subExprs.states) {
+boundUpdateExpr.map(_.genCode(ctx))
+  }
+
+  val evalAndUpdateCodes = unsafeRowBufferEvals.zipWithIndex.map { case 
(ev, i) =>
+val dt = updateExpr(i).dataType
+val updateColumnCode =
+  CodeGenerator.updateColumn(unsafeRowBuffer, dt, i, ev, 
updateExpr(i).nullable)
+s"""
+   | // evaluate aggregate function
+   | ${ev.code}
+   | // update unsafe row buffer
+   | $updateColumnCode
+ """.stripMargin
+  }
+
+  val updateAggValCode = splitAggregateExpressions(
+context = ctx,
+aggregateExpressions = boundUpdateExpr,
+codes = evalAndUpdateCodes,
+subExprs = subExprs.states,
+bufferInput = Some((unsafeRowBuffer, 
ObjectType(classOf[InternalRow]
+
+  s"""
+ | // do aggregate
+ | // copy aggregation row buffer to the local
+ | $initLocalRowBuffer
+ | // common sub-expressions
+ | $effectiveCodes
+ | // process aggregate functions to update aggregation buffer
 
 Review comment:
   Ditto on the comment phrasing


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316417255
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -267,29 +302,81 @@ case class HashAggregateExec(
   
e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
   }
 }
-ctx.currentVars = bufVars ++ input
-val boundUpdateExpr = bindReferences(updateExpr, inputAttrs)
-val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
-val effectiveCodes = subExprs.codes.mkString("\n")
-val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
-  boundUpdateExpr.map(_.genCode(ctx))
-}
-// aggregate buffer should be updated atomic
-val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+
+if (!conf.codegenSplitAggregateFunc) {
+  ctx.currentVars = bufVars ++ input
+  val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttrs))
+  val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+  val effectiveCodes = subExprs.codes.mkString("\n")
+  val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
+boundUpdateExpr.map(_.genCode(ctx))
+  }
+  // aggregate buffer should be updated atomic
+  val updates = aggVals.zipWithIndex.map { case (ev, i) =>
 
 Review comment:
   Looks like we can do `aggVals.zip(bufVars).map { case (ev, bufVar) =>` here?
   
   Ditto about this being old code but still worth cleaning up.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316427809
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -267,29 +302,81 @@ case class HashAggregateExec(
   
e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
   }
 }
-ctx.currentVars = bufVars ++ input
-val boundUpdateExpr = bindReferences(updateExpr, inputAttrs)
-val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
-val effectiveCodes = subExprs.codes.mkString("\n")
-val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
-  boundUpdateExpr.map(_.genCode(ctx))
-}
-// aggregate buffer should be updated atomic
-val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+
+if (!conf.codegenSplitAggregateFunc) {
+  ctx.currentVars = bufVars ++ input
+  val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttrs))
+  val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+  val effectiveCodes = subExprs.codes.mkString("\n")
+  val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
+boundUpdateExpr.map(_.genCode(ctx))
+  }
+  // aggregate buffer should be updated atomic
+  val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+s"""
+   | ${bufVars(i).isNull} = ${ev.isNull};
+   | ${bufVars(i).value} = ${ev.value};
+   """.stripMargin
+  }
+  s"""
+ | // do aggregate
+ | // common sub-expressions
+ | $effectiveCodes
+ | // evaluate aggregate function
+ | ${evaluateVariables(aggVals)}
+ | // update aggregation buffer
+ | ${updates.mkString("\n").trim}
+ """.stripMargin
+} else {
+  // We need to copy the aggregation buffer to local variables first 
because each aggregate
 
 Review comment:
   Could you please help elaborate why copying the aggregate buffer values to 
local variables is required here?
   
   In the current data layout design, `DeclarativeAggregate`s never share their 
buffer across aggregate expressions; so it never made any sense to me why in 
the old code we have to update all aggregate buffer slots at the end after all 
update expressions have been evaluated.
   
   The flow of:
   `Load aggregate buffer -> evaluate update expressions -> store back to 
aggregate buffer`
   should be strictly confined to each aggregate expression.
   
   The 3-step flow logically looks like this:
   ```
   // step 1: load from aggregate buffer
   val localAggBufSlot1_isNull = aggBufSlot1_isNull;
   val localAggBufSlot1_value = aggBufSlot1_value;
   
   // step 2: evaluate and materialize update/merge expressions
   val isNull2 = false;
   val value2 = (localAggBufSlot1_isNull ? 0 : localAggBufSlot1_value) + input1;
   
   // step 3: write back to aggregate buffer
   aggBufSlot1_isNull = isNull2;
   aggBufSlot1_value = value2;
   ```
   
   As you can see, evaluating the update/merge expressions won't have any side 
effects on the aggregate buffers -- they're evaluated to local variables first, 
and then stored back to aggregate buffers. So even if I simplify that to:
   ```
   // NO step 1: load from aggregate buffer
   // val localAggBufSlot1_isNull = aggBufSlot1_isNull;
   // val localAggBufSlot1_value = aggBufSlot1_value;
   
   // step 2: evaluate and materialize update/merge expressions, directly 
loading from aggregate buffers
   val isNull2 = false;
   val value2 = (aggBufSlot1_isNull ? 0 : aggBufSlot1_value) + input1;
   
   // step 3: write back to aggregate buffer
   aggBufSlot1_isNull = isNull2;
   aggBufSlot1_value = value2;
   ```
   It'll be just as safe.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316429335
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -844,9 +931,51 @@ case class HashAggregateExec(
  |// update unsafe row buffer
  |${updateUnsafeRowBuffer.mkString("\n").trim}
""".stripMargin
+} else {
+  // We need to copy the aggregation row buffer to a local row first 
because each aggregate
+  // function directly updates the buffer when it finishes.
+  val localRowBuffer = ctx.freshName("localUnsafeRowBuffer")
+  val initLocalRowBuffer = s"InternalRow $localRowBuffer = 
$unsafeRowBuffer.copy();"
+
+  ctx.INPUT_ROW = localRowBuffer
+  val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttr))
+  val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+  val effectiveCodes = subExprs.codes.mkString("\n")
+  val unsafeRowBufferEvals = 
ctx.withSubExprEliminationExprs(subExprs.states) {
+boundUpdateExpr.map(_.genCode(ctx))
+  }
+
+  val evalAndUpdateCodes = unsafeRowBufferEvals.zipWithIndex.map { case 
(ev, i) =>
+val dt = updateExpr(i).dataType
+val updateColumnCode =
+  CodeGenerator.updateColumn(unsafeRowBuffer, dt, i, ev, 
updateExpr(i).nullable)
+s"""
+   | // evaluate aggregate function
+   | ${ev.code}
+   | // update unsafe row buffer
+   | $updateColumnCode
+ """.stripMargin
+  }
+
+  val updateAggValCode = splitAggregateExpressions(
+context = ctx,
+aggregateExpressions = boundUpdateExpr,
+codes = evalAndUpdateCodes,
+subExprs = subExprs.states,
+bufferInput = Some((unsafeRowBuffer, 
ObjectType(classOf[InternalRow]
+
+  s"""
+ | // do aggregate
 
 Review comment:
   Ditto on leading space


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316428178
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -267,29 +302,81 @@ case class HashAggregateExec(
   
e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
   }
 }
-ctx.currentVars = bufVars ++ input
-val boundUpdateExpr = bindReferences(updateExpr, inputAttrs)
-val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
-val effectiveCodes = subExprs.codes.mkString("\n")
-val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
-  boundUpdateExpr.map(_.genCode(ctx))
-}
-// aggregate buffer should be updated atomic
-val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+
+if (!conf.codegenSplitAggregateFunc) {
+  ctx.currentVars = bufVars ++ input
+  val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttrs))
+  val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+  val effectiveCodes = subExprs.codes.mkString("\n")
+  val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
+boundUpdateExpr.map(_.genCode(ctx))
+  }
+  // aggregate buffer should be updated atomic
+  val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+s"""
+   | ${bufVars(i).isNull} = ${ev.isNull};
+   | ${bufVars(i).value} = ${ev.value};
+   """.stripMargin
+  }
+  s"""
+ | // do aggregate
+ | // common sub-expressions
+ | $effectiveCodes
+ | // evaluate aggregate function
+ | ${evaluateVariables(aggVals)}
+ | // update aggregation buffer
+ | ${updates.mkString("\n").trim}
+ """.stripMargin
+} else {
+  // We need to copy the aggregation buffer to local variables first 
because each aggregate
+  // function directly updates the buffer when it finishes.
+  val localBufVars = bufVars.zip(updateExpr).map { case (ev, e) =>
+val isNull = ctx.freshName("localBufIsNull")
+val value = ctx.freshName("localBufValue")
+val initLocalVars = code"""
+   | boolean $isNull = ${ev.isNull};
+   | ${CodeGenerator.javaType(e.dataType)} $value = ${ev.value};
+ """.stripMargin
+ExprCode(initLocalVars, JavaCode.isNullVariable(isNull),
+  JavaCode.variable(value, e.dataType))
+  }
+
+  val initLocalBufVar = evaluateVariables(localBufVars)
+
+  ctx.currentVars = localBufVars ++ input
+  val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttrs))
+  val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+  val effectiveCodes = subExprs.codes.mkString("\n")
+  val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
+boundUpdateExpr.map(_.genCode(ctx))
+  }
+
+  val evalAndUpdateCodes = aggVals.zipWithIndex.map { case (ev, i) =>
+s"""
+   | // evaluate aggregate function
 
 Review comment:
   Cosmetic: remove leading space after the `|` margin marker?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316428843
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -844,9 +931,51 @@ case class HashAggregateExec(
  |// update unsafe row buffer
  |${updateUnsafeRowBuffer.mkString("\n").trim}
""".stripMargin
+} else {
+  // We need to copy the aggregation row buffer to a local row first 
because each aggregate
+  // function directly updates the buffer when it finishes.
+  val localRowBuffer = ctx.freshName("localUnsafeRowBuffer")
+  val initLocalRowBuffer = s"InternalRow $localRowBuffer = 
$unsafeRowBuffer.copy();"
+
+  ctx.INPUT_ROW = localRowBuffer
+  val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttr))
+  val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+  val effectiveCodes = subExprs.codes.mkString("\n")
+  val unsafeRowBufferEvals = 
ctx.withSubExprEliminationExprs(subExprs.states) {
+boundUpdateExpr.map(_.genCode(ctx))
+  }
+
+  val evalAndUpdateCodes = unsafeRowBufferEvals.zipWithIndex.map { case 
(ev, i) =>
+val dt = updateExpr(i).dataType
+val updateColumnCode =
+  CodeGenerator.updateColumn(unsafeRowBuffer, dt, i, ev, 
updateExpr(i).nullable)
+s"""
+   | // evaluate aggregate function
 
 Review comment:
   Ditto on leading space


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316429935
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -892,6 +1021,57 @@ case class HashAggregateExec(
   } else {
 updateRowInRegularHashMap
   }
+} else {
+  if (isFastHashMapEnabled) {
+// We need to copy the aggregation row buffer to a local row first 
because each aggregate
+// function directly updates the buffer when it finishes.
+val localRowBuffer = ctx.freshName("localFastRowBuffer")
+val initLocalRowBuffer = s"InternalRow $localRowBuffer = 
$fastRowBuffer.copy();"
+
+ctx.INPUT_ROW = localRowBuffer
+val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttr))
+val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+val effectiveCodes = subExprs.codes.mkString("\n")
+val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) {
+  boundUpdateExpr.map(_.genCode(ctx))
+}
+
+val evalAndUpdateCodes = fastRowEvals.zipWithIndex.map { case (ev, i) 
=>
+  val dt = updateExpr(i).dataType
+  val updateColumnCode = CodeGenerator.updateColumn(
+fastRowBuffer, dt, i, ev, updateExpr(i).nullable, 
isVectorizedHashMapEnabled)
+  s"""
+ | // evaluate aggregate function
+ | ${ev.code}
+ | // update fast row
+ | $updateColumnCode
+   """.stripMargin
+}
+
+val updateAggValCode = splitAggregateExpressions(
+  context = ctx,
+  aggregateExpressions = boundUpdateExpr,
+  codes = evalAndUpdateCodes,
+  subExprs = subExprs.states,
+  bufferInput = Some((fastRowBuffer, 
ObjectType(classOf[InternalRow]
+
+// If fast hash map is on, we first generate code to update row in 
fast hash map, if the
+// previous loop up hit fast hash map. Otherwise, update row in 
regular hash map.
+s"""
+   |if ($fastRowBuffer != null) {
+   |  // copy aggregation row buffer to the local
+   |  $initLocalRowBuffer
+   |  // common sub-expressions
+   |  $effectiveCodes
+   |  // process aggregate functions to update aggregation buffer
 
 Review comment:
   Ditto on the comment phrasing


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316429850
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -892,6 +1021,57 @@ case class HashAggregateExec(
   } else {
 updateRowInRegularHashMap
   }
+} else {
+  if (isFastHashMapEnabled) {
+// We need to copy the aggregation row buffer to a local row first 
because each aggregate
+// function directly updates the buffer when it finishes.
+val localRowBuffer = ctx.freshName("localFastRowBuffer")
+val initLocalRowBuffer = s"InternalRow $localRowBuffer = 
$fastRowBuffer.copy();"
+
+ctx.INPUT_ROW = localRowBuffer
+val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttr))
+val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+val effectiveCodes = subExprs.codes.mkString("\n")
+val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) {
+  boundUpdateExpr.map(_.genCode(ctx))
+}
+
+val evalAndUpdateCodes = fastRowEvals.zipWithIndex.map { case (ev, i) 
=>
+  val dt = updateExpr(i).dataType
+  val updateColumnCode = CodeGenerator.updateColumn(
+fastRowBuffer, dt, i, ev, updateExpr(i).nullable, 
isVectorizedHashMapEnabled)
+  s"""
+ | // evaluate aggregate function
 
 Review comment:
   Ditto on leading space


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316413544
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -255,6 +255,41 @@ case class HashAggregateExec(
  """.stripMargin
   }
 
+  // Splits aggregate code into small functions because the most of JVM 
implementations
+  // can not compile too long functions. Note that different from 
`CodeGenerator.splitExpressions`,
+  // we will extract input variables from references and subexpression 
elimination states
+  // for each aggregate expression, then pass them to it.
+  private def splitAggregateExpressions(
+  context: CodegenContext,
 
 Review comment:
   Cosmetic: all other places that refer to a `CodegenContext`  spells it as 
`ctx`. Maybe worth making the naming consistent here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316431020
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ##
 @@ -1612,6 +1612,49 @@ object CodeGenerator extends Logging {
 }
   }
 
+  /**
+   * Extracts all the input variables from references and subexpression 
elimination states
+   * for a given `expr`. This result will be used to split the generated code 
of
+   * expressions into multiple functions.
+   */
+  def getLocalInputVariableValues(
+  context: CodegenContext,
+  expr: Expression,
+  subExprs: Map[Expression, SubExprEliminationState]): Seq[((String, 
DataType), Expression)] = {
+val argMap = mutable.Map[(String, DataType), Expression]()
+val stack = mutable.Stack[Expression](expr)
+while (stack.nonEmpty) {
+  stack.pop() match {
+case e if subExprs.contains(e) =>
+  val exprCode = subExprs(e)
+  val SubExprEliminationState(isNull, value) = exprCode
 
 Review comment:
   Can we combine this line and the line above to `val 
SubExprEliminationState(isNull, value) = subExprs(e)`?
   The `exprCode` can be a bit confusing (the type of this local variable is 
not `ExprCode`)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316428090
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -267,29 +302,81 @@ case class HashAggregateExec(
   
e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
   }
 }
-ctx.currentVars = bufVars ++ input
-val boundUpdateExpr = bindReferences(updateExpr, inputAttrs)
-val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
-val effectiveCodes = subExprs.codes.mkString("\n")
-val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
-  boundUpdateExpr.map(_.genCode(ctx))
-}
-// aggregate buffer should be updated atomic
-val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+
+if (!conf.codegenSplitAggregateFunc) {
+  ctx.currentVars = bufVars ++ input
+  val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttrs))
+  val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+  val effectiveCodes = subExprs.codes.mkString("\n")
+  val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
+boundUpdateExpr.map(_.genCode(ctx))
+  }
+  // aggregate buffer should be updated atomic
+  val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+s"""
+   | ${bufVars(i).isNull} = ${ev.isNull};
+   | ${bufVars(i).value} = ${ev.value};
+   """.stripMargin
+  }
+  s"""
+ | // do aggregate
+ | // common sub-expressions
+ | $effectiveCodes
+ | // evaluate aggregate function
+ | ${evaluateVariables(aggVals)}
+ | // update aggregation buffer
+ | ${updates.mkString("\n").trim}
+ """.stripMargin
+} else {
+  // We need to copy the aggregation buffer to local variables first 
because each aggregate
+  // function directly updates the buffer when it finishes.
+  val localBufVars = bufVars.zip(updateExpr).map { case (ev, e) =>
+val isNull = ctx.freshName("localBufIsNull")
+val value = ctx.freshName("localBufValue")
+val initLocalVars = code"""
+   | boolean $isNull = ${ev.isNull};
+   | ${CodeGenerator.javaType(e.dataType)} $value = ${ev.value};
+ """.stripMargin
+ExprCode(initLocalVars, JavaCode.isNullVariable(isNull),
+  JavaCode.variable(value, e.dataType))
+  }
+
+  val initLocalBufVar = evaluateVariables(localBufVars)
+
+  ctx.currentVars = localBufVars ++ input
+  val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttrs))
+  val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+  val effectiveCodes = subExprs.codes.mkString("\n")
+  val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
+boundUpdateExpr.map(_.genCode(ctx))
+  }
+
+  val evalAndUpdateCodes = aggVals.zipWithIndex.map { case (ev, i) =>
 
 Review comment:
   Again, change to `aggVals.zip(bufVars).map { case (ev, bufVar) =>` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316428382
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -267,29 +302,81 @@ case class HashAggregateExec(
   
e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
   }
 }
-ctx.currentVars = bufVars ++ input
-val boundUpdateExpr = bindReferences(updateExpr, inputAttrs)
-val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
-val effectiveCodes = subExprs.codes.mkString("\n")
-val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
-  boundUpdateExpr.map(_.genCode(ctx))
-}
-// aggregate buffer should be updated atomic
-val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+
+if (!conf.codegenSplitAggregateFunc) {
+  ctx.currentVars = bufVars ++ input
+  val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttrs))
+  val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+  val effectiveCodes = subExprs.codes.mkString("\n")
+  val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
+boundUpdateExpr.map(_.genCode(ctx))
+  }
+  // aggregate buffer should be updated atomic
+  val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+s"""
+   | ${bufVars(i).isNull} = ${ev.isNull};
+   | ${bufVars(i).value} = ${ev.value};
+   """.stripMargin
+  }
+  s"""
+ | // do aggregate
+ | // common sub-expressions
+ | $effectiveCodes
+ | // evaluate aggregate function
+ | ${evaluateVariables(aggVals)}
+ | // update aggregation buffer
+ | ${updates.mkString("\n").trim}
+ """.stripMargin
+} else {
+  // We need to copy the aggregation buffer to local variables first 
because each aggregate
+  // function directly updates the buffer when it finishes.
+  val localBufVars = bufVars.zip(updateExpr).map { case (ev, e) =>
+val isNull = ctx.freshName("localBufIsNull")
+val value = ctx.freshName("localBufValue")
+val initLocalVars = code"""
+   | boolean $isNull = ${ev.isNull};
+   | ${CodeGenerator.javaType(e.dataType)} $value = ${ev.value};
+ """.stripMargin
+ExprCode(initLocalVars, JavaCode.isNullVariable(isNull),
+  JavaCode.variable(value, e.dataType))
+  }
+
+  val initLocalBufVar = evaluateVariables(localBufVars)
+
+  ctx.currentVars = localBufVars ++ input
+  val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttrs))
+  val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+  val effectiveCodes = subExprs.codes.mkString("\n")
+  val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
+boundUpdateExpr.map(_.genCode(ctx))
+  }
+
+  val evalAndUpdateCodes = aggVals.zipWithIndex.map { case (ev, i) =>
+s"""
+   | // evaluate aggregate function
+   | ${ev.code}
+   | // update aggregation buffer
+   | ${bufVars(i).isNull} = ${ev.isNull};
+   | ${bufVars(i).value} = ${ev.value};
+ """.stripMargin
+  }
+
+  val updateAggValCode = splitAggregateExpressions(
+context = ctx,
+aggregateExpressions = boundUpdateExpr,
+codes = evalAndUpdateCodes,
+subExprs = subExprs.states)
+
   s"""
- | ${bufVars(i).isNull} = ${ev.isNull};
- | ${bufVars(i).value} = ${ev.value};
+ | // do aggregate
+ | // copy aggregation buffer to the local
+ | $initLocalBufVar
+ | // common sub-expressions
+ | $effectiveCodes
+ | // process aggregate functions to update aggregation buffer
 
 Review comment:
   "evaluate aggregate functions and update aggregation buffers"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316418406
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -267,29 +302,81 @@ case class HashAggregateExec(
   
e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
   }
 }
-ctx.currentVars = bufVars ++ input
-val boundUpdateExpr = bindReferences(updateExpr, inputAttrs)
-val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
-val effectiveCodes = subExprs.codes.mkString("\n")
-val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
-  boundUpdateExpr.map(_.genCode(ctx))
-}
-// aggregate buffer should be updated atomic
-val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+
+if (!conf.codegenSplitAggregateFunc) {
 
 Review comment:
   Cosmetic: if we're planning to turn this on by default, code style-wise it 
makes more sense to me to have `if (conf.codegenSplitAggregateFunc)` go first. 
The visual cue of the bang (`!`) is much less striking than the long conf 
name...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316430732
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ##
 @@ -1612,6 +1612,49 @@ object CodeGenerator extends Logging {
 }
   }
 
+  /**
+   * Extracts all the input variables from references and subexpression 
elimination states
+   * for a given `expr`. This result will be used to split the generated code 
of
+   * expressions into multiple functions.
+   */
+  def getLocalInputVariableValues(
+  context: CodegenContext,
+  expr: Expression,
+  subExprs: Map[Expression, SubExprEliminationState]): Seq[((String, 
DataType), Expression)] = {
+val argMap = mutable.Map[(String, DataType), Expression]()
+val stack = mutable.Stack[Expression](expr)
 
 Review comment:
   Question: so the stack here is only used for implementing depth-first 
pre-order traversal of an expression tree in an iterative style instead of 
recursive style, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316430268
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ##
 @@ -1612,6 +1612,49 @@ object CodeGenerator extends Logging {
 }
   }
 
+  /**
+   * Extracts all the input variables from references and subexpression 
elimination states
+   * for a given `expr`. This result will be used to split the generated code 
of
+   * expressions into multiple functions.
+   */
+  def getLocalInputVariableValues(
+  context: CodegenContext,
+  expr: Expression,
+  subExprs: Map[Expression, SubExprEliminationState]): Seq[((String, 
DataType), Expression)] = {
 
 Review comment:
   `(String, DataType)` looks like a degenerated `VariableValue`. Can we just 
use the `VariableValue` type here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] Split aggregation code into small functions

2019-08-21 Thread GitBox
rednaxelafx commented on a change in pull request #20965: [SPARK-21870][SQL] 
Split aggregation code into small functions
URL: https://github.com/apache/spark/pull/20965#discussion_r316418729
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##
 @@ -267,29 +302,81 @@ case class HashAggregateExec(
   
e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
   }
 }
-ctx.currentVars = bufVars ++ input
-val boundUpdateExpr = bindReferences(updateExpr, inputAttrs)
-val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
-val effectiveCodes = subExprs.codes.mkString("\n")
-val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
-  boundUpdateExpr.map(_.genCode(ctx))
-}
-// aggregate buffer should be updated atomic
-val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+
+if (!conf.codegenSplitAggregateFunc) {
+  ctx.currentVars = bufVars ++ input
+  val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttrs))
+  val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+  val effectiveCodes = subExprs.codes.mkString("\n")
+  val aggVals = ctx.withSubExprEliminationExprs(subExprs.states) {
+boundUpdateExpr.map(_.genCode(ctx))
+  }
+  // aggregate buffer should be updated atomic
+  val updates = aggVals.zipWithIndex.map { case (ev, i) =>
+s"""
+   | ${bufVars(i).isNull} = ${ev.isNull};
+   | ${bufVars(i).value} = ${ev.value};
+   """.stripMargin
+  }
+  s"""
+ | // do aggregate
 
 Review comment:
   I was always curious about the leading space after the `|` margin marker in 
these codegen templates...
   
   Ditto about this being code code but still worth clean ing up.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org