[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18931


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163752999
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -156,13 +162,96 @@ trait CodegenSupport extends SparkPlan {
 ctx.INPUT_ROW = null
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The config "spark.sql.codegen.splitConsumeFuncByOperator" is 
enabled.
+// 2. `inputVars` are all materialized. That is guaranteed to be true 
if the parent plan uses
+//all variables in output (see `requireAllOutput`).
+// 3. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (SQLConf.get.wholeStageSplitConsumeFuncByOperator && 
requireAllOutput &&
+  ctx.isValidParamLength(output)) {
+constructDoConsumeFunction(ctx, inputVars, row)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
+ """.stripMargin
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. We can separate the
+   * parent's `doConsume` codes of a `CodegenSupport` operator into a 
function to call.
+   */
+  private def constructDoConsumeFunction(
+  ctx: CodegenContext,
+  inputVars: Seq[ExprCode],
+  row: String): String = {
+val (args, params, inputVarsInFunc) = constructConsumeParameters(ctx, 
output, inputVars, row)
+val rowVar = prepareRowVar(ctx, row, inputVarsInFunc)
+
+val doConsume = ctx.freshName("doConsume")
+ctx.currentVars = inputVarsInFunc
+ctx.INPUT_ROW = null
+
+val doConsumeFuncName = ctx.addNewFunction(doConsume,
+  s"""
+ | private void $doConsume(${params.mkString(", ")}) throws 
java.io.IOException {
+ |   ${parent.doConsume(ctx, inputVarsInFunc, rowVar)}
+ | }
+   """.stripMargin)
+
+s"""
+   | $doConsumeFuncName(${args.mkString(", ")});
  """.stripMargin
   }
 
+  /**
+   * Returns arguments for calling method and method definition parameters 
of the consume function.
+   * And also returns the list of `ExprCode` for the parameters.
+   */
+  private def constructConsumeParameters(
+  ctx: CodegenContext,
+  attributes: Seq[Attribute],
+  variables: Seq[ExprCode],
+  row: String): (Seq[String], Seq[String], Seq[ExprCode]) = {
+val arguments = mutable.ArrayBuffer[String]()
+val parameters = mutable.ArrayBuffer[String]()
+val paramVars = mutable.ArrayBuffer[ExprCode]()
+
+if (row != null) {
+  arguments += row
--- End diff --

Added an extra unit for `row` if needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163747141
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -156,13 +162,96 @@ trait CodegenSupport extends SparkPlan {
 ctx.INPUT_ROW = null
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The config "spark.sql.codegen.splitConsumeFuncByOperator" is 
enabled.
+// 2. `inputVars` are all materialized. That is guaranteed to be true 
if the parent plan uses
+//all variables in output (see `requireAllOutput`).
+// 3. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (SQLConf.get.wholeStageSplitConsumeFuncByOperator && 
requireAllOutput &&
--- End diff --

super nit:
```
val confEnabled = SQLConf.get.wholeStageSplitConsumeFuncByOperator
if (confEnabled && ...)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163746698
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -156,13 +162,96 @@ trait CodegenSupport extends SparkPlan {
 ctx.INPUT_ROW = null
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The config "spark.sql.codegen.splitConsumeFuncByOperator" is 
enabled.
+// 2. `inputVars` are all materialized. That is guaranteed to be true 
if the parent plan uses
+//all variables in output (see `requireAllOutput`).
+// 3. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (SQLConf.get.wholeStageSplitConsumeFuncByOperator && 
requireAllOutput &&
+  ctx.isValidParamLength(output)) {
+constructDoConsumeFunction(ctx, inputVars, row)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
+ """.stripMargin
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. We can separate the
+   * parent's `doConsume` codes of a `CodegenSupport` operator into a 
function to call.
+   */
+  private def constructDoConsumeFunction(
+  ctx: CodegenContext,
+  inputVars: Seq[ExprCode],
+  row: String): String = {
+val (args, params, inputVarsInFunc) = constructConsumeParameters(ctx, 
output, inputVars, row)
+val rowVar = prepareRowVar(ctx, row, inputVarsInFunc)
+
+val doConsume = ctx.freshName("doConsume")
+ctx.currentVars = inputVarsInFunc
+ctx.INPUT_ROW = null
+
+val doConsumeFuncName = ctx.addNewFunction(doConsume,
+  s"""
+ | private void $doConsume(${params.mkString(", ")}) throws 
java.io.IOException {
+ |   ${parent.doConsume(ctx, inputVarsInFunc, rowVar)}
+ | }
+   """.stripMargin)
+
+s"""
+   | $doConsumeFuncName(${args.mkString(", ")});
  """.stripMargin
   }
 
+  /**
+   * Returns arguments for calling method and method definition parameters 
of the consume function.
+   * And also returns the list of `ExprCode` for the parameters.
+   */
+  private def constructConsumeParameters(
+  ctx: CodegenContext,
+  attributes: Seq[Attribute],
+  variables: Seq[ExprCode],
+  row: String): (Seq[String], Seq[String], Seq[ExprCode]) = {
+val arguments = mutable.ArrayBuffer[String]()
+val parameters = mutable.ArrayBuffer[String]()
+val paramVars = mutable.ArrayBuffer[ExprCode]()
+
+if (row != null) {
+  arguments += row
--- End diff --

we should probably have 2 methods for calculating param length and checking 
param length limitation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163746605
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -156,13 +162,96 @@ trait CodegenSupport extends SparkPlan {
 ctx.INPUT_ROW = null
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The config "spark.sql.codegen.splitConsumeFuncByOperator" is 
enabled.
+// 2. `inputVars` are all materialized. That is guaranteed to be true 
if the parent plan uses
+//all variables in output (see `requireAllOutput`).
+// 3. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (SQLConf.get.wholeStageSplitConsumeFuncByOperator && 
requireAllOutput &&
+  ctx.isValidParamLength(output)) {
+constructDoConsumeFunction(ctx, inputVars, row)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
+ """.stripMargin
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. We can separate the
+   * parent's `doConsume` codes of a `CodegenSupport` operator into a 
function to call.
+   */
+  private def constructDoConsumeFunction(
+  ctx: CodegenContext,
+  inputVars: Seq[ExprCode],
+  row: String): String = {
+val (args, params, inputVarsInFunc) = constructConsumeParameters(ctx, 
output, inputVars, row)
+val rowVar = prepareRowVar(ctx, row, inputVarsInFunc)
+
+val doConsume = ctx.freshName("doConsume")
+ctx.currentVars = inputVarsInFunc
+ctx.INPUT_ROW = null
+
+val doConsumeFuncName = ctx.addNewFunction(doConsume,
+  s"""
+ | private void $doConsume(${params.mkString(", ")}) throws 
java.io.IOException {
+ |   ${parent.doConsume(ctx, inputVarsInFunc, rowVar)}
+ | }
+   """.stripMargin)
+
+s"""
+   | $doConsumeFuncName(${args.mkString(", ")});
  """.stripMargin
   }
 
+  /**
+   * Returns arguments for calling method and method definition parameters 
of the consume function.
+   * And also returns the list of `ExprCode` for the parameters.
+   */
+  private def constructConsumeParameters(
+  ctx: CodegenContext,
+  attributes: Seq[Attribute],
+  variables: Seq[ExprCode],
+  row: String): (Seq[String], Seq[String], Seq[ExprCode]) = {
+val arguments = mutable.ArrayBuffer[String]()
+val parameters = mutable.ArrayBuffer[String]()
+val paramVars = mutable.ArrayBuffer[ExprCode]()
+
+if (row != null) {
+  arguments += row
--- End diff --

we need to update `ctx.isValidParamLength` to count this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163743504
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -661,6 +661,15 @@ object SQLConf {
 .intConf
 .createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT)
 
+  val WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR =
+buildConf("spark.sql.codegen.splitConsumeFuncByOperator")
+  .internal()
+  .doc("When true, whole stage codegen would put the logic of 
consuming rows of each " +
+"physical operator into individual methods, instead of a single 
big method. This can be " +
+"used to avoid oversized function that can miss the opportunity of 
JIT optimization.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

Set to true by default. If there is objection, I can change it to false.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163723759
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1263,6 +1271,8 @@ class SQLConf extends Serializable with Logging {
 
   def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT)
 
+  def decoupleOperatorConsumeFuncs: Boolean = 
getConf(DECOUPLE_OPERATOR_CONSUME_FUNCTIONS)
--- End diff --

Sure. Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163723731
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan {
 ctx.INPUT_ROW = null
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is 
enabled.
+// 2. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 3. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 4. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 5. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && 
outputVars.nonEmpty &&
+  requireAllOutput && ctx.isValidParamLength(output)) {
+constructDoConsumeFunction(ctx, inputVars)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
+ """.stripMargin
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. We can separate the
+   * parent's `doConsume` codes of a `CodegenSupport` operator into a 
function to call.
+   */
+  private def constructDoConsumeFunction(
+  ctx: CodegenContext,
+  inputVars: Seq[ExprCode]): String = {
+val (callingParams, arguList, inputVarsInFunc) =
--- End diff --

Sounds cleaner. I need to change it a little because the arguments and 
parameters are not the same. Some variables are not able parameterized, e.g., 
constants or statements.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163723249
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan {
 ctx.INPUT_ROW = null
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is 
enabled.
+// 2. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 3. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 4. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 5. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && 
outputVars.nonEmpty &&
+  requireAllOutput && ctx.isValidParamLength(output)) {
+constructDoConsumeFunction(ctx, inputVars)
--- End diff --

Good point.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163723212
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan {
 ctx.INPUT_ROW = null
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is 
enabled.
+// 2. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 3. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 4. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
--- End diff --

Seems to me `outputVars != null` isn't necessary too. When it is null, 
`row` can't be null. `inputVars` will bind on `row` columns and be evaluated 
before calling created method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163723254
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan {
 ctx.INPUT_ROW = null
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is 
enabled.
+// 2. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 3. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 4. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 5. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && 
outputVars.nonEmpty &&
+  requireAllOutput && ctx.isValidParamLength(output)) {
+constructDoConsumeFunction(ctx, inputVars)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
+ """.stripMargin
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. We can separate the
+   * parent's `doConsume` codes of a `CodegenSupport` operator into a 
function to call.
+   */
+  private def constructDoConsumeFunction(
+  ctx: CodegenContext,
+  inputVars: Seq[ExprCode]): String = {
+val (callingParams, arguList, inputVarsInFunc) =
+  constructConsumeParameters(ctx, output, inputVars)
+
+// Set up rowVar because parent plan can possibly consume UnsafeRow 
instead of variables.
+val colExprs = output.zipWithIndex.map { case (attr, i) =>
+  BoundReference(i, attr.dataType, attr.nullable)
+}
+// Don't need to copy the variables because they're already evaluated 
before entering function.
+ctx.INPUT_ROW = null
+ctx.currentVars = inputVarsInFunc
+val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false)
+val rowVar = ExprCode(ev.code.trim, "false", ev.value)
+
+val doConsume = ctx.freshName("doConsume")
--- End diff --

The `freshName` here will add `variablePrefix` before `doConsume`. So it 
already has operator name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163723195
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan {
 ctx.INPUT_ROW = null
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is 
enabled.
+// 2. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 3. The output variables are not empty. If it's empty, we don't 
bother to do that.
--- End diff --

Sounds correct to me, logically, although I have no clear idea about the 
actual operator can be. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163723102
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -661,6 +661,14 @@ object SQLConf {
 .intConf
 .createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT)
 
+  val DECOUPLE_OPERATOR_CONSUME_FUNCTIONS = 
buildConf("spark.sql.codegen.decoupleOperatorConsume")
--- End diff --

Looks good to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163621072
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1263,6 +1271,8 @@ class SQLConf extends Serializable with Logging {
 
   def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT)
 
+  def decoupleOperatorConsumeFuncs: Boolean = 
getConf(DECOUPLE_OPERATOR_CONSUME_FUNCTIONS)
--- End diff --

Add the `wholeStage` prefix for such flag names.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163620741
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -661,6 +661,14 @@ object SQLConf {
 .intConf
 .createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT)
 
+  val DECOUPLE_OPERATOR_CONSUME_FUNCTIONS = 
buildConf("spark.sql.codegen.decoupleOperatorConsume")
--- End diff --

`DECOUPLE_OPERATOR_CONSUME_FUNCTIONS` -> 
`WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163518761
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan {
 ctx.INPUT_ROW = null
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is 
enabled.
+// 2. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 3. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 4. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 5. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && 
outputVars.nonEmpty &&
+  requireAllOutput && ctx.isValidParamLength(output)) {
+constructDoConsumeFunction(ctx, inputVars)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
+ """.stripMargin
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. We can separate the
+   * parent's `doConsume` codes of a `CodegenSupport` operator into a 
function to call.
+   */
+  private def constructDoConsumeFunction(
+  ctx: CodegenContext,
+  inputVars: Seq[ExprCode]): String = {
+val (callingParams, arguList, inputVarsInFunc) =
--- End diff --

I feel it's cleaner to return `paramNames, paramTypes, paramVars`, then we 
can simply do
```
void $doConsume(paramTypes.zip(paramNames).map(i => i._1 + " " + 
i._2).mkString(", "))
```
and
```
doConsumeFuncName(paramNames.mkString(", "))
```

inside `constructConsumeParameters` we can just create 3 mutable 
collections and go through `variables` to fill these collections.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163516654
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan {
 ctx.INPUT_ROW = null
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is 
enabled.
+// 2. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 3. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 4. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 5. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && 
outputVars.nonEmpty &&
+  requireAllOutput && ctx.isValidParamLength(output)) {
+constructDoConsumeFunction(ctx, inputVars)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
+ """.stripMargin
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. We can separate the
+   * parent's `doConsume` codes of a `CodegenSupport` operator into a 
function to call.
+   */
+  private def constructDoConsumeFunction(
+  ctx: CodegenContext,
+  inputVars: Seq[ExprCode]): String = {
+val (callingParams, arguList, inputVarsInFunc) =
+  constructConsumeParameters(ctx, output, inputVars)
+
+// Set up rowVar because parent plan can possibly consume UnsafeRow 
instead of variables.
+val colExprs = output.zipWithIndex.map { case (attr, i) =>
+  BoundReference(i, attr.dataType, attr.nullable)
+}
+// Don't need to copy the variables because they're already evaluated 
before entering function.
+ctx.INPUT_ROW = null
+ctx.currentVars = inputVarsInFunc
+val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false)
+val rowVar = ExprCode(ev.code.trim, "false", ev.value)
+
+val doConsume = ctx.freshName("doConsume")
--- End diff --

shall we put the operator name in this function name?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163514909
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan {
 ctx.INPUT_ROW = null
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is 
enabled.
+// 2. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 3. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 4. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 5. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && 
outputVars.nonEmpty &&
+  requireAllOutput && ctx.isValidParamLength(output)) {
+constructDoConsumeFunction(ctx, inputVars)
--- End diff --

maybe we should create a method for generating `rowVar`, so that we can use 
it in both `consume` and `constructDoConsumeFunction`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163513927
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan {
 ctx.INPUT_ROW = null
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is 
enabled.
+// 2. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 3. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 4. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 5. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && 
outputVars.nonEmpty &&
+  requireAllOutput && ctx.isValidParamLength(output)) {
+constructDoConsumeFunction(ctx, inputVars)
--- End diff --

we should pass `row` to this function, if it's non-null, we can save a 
projection.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163512972
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan {
 ctx.INPUT_ROW = null
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is 
enabled.
+// 2. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 3. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 4. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
--- End diff --

I think what we need is `inputVars` are all materialized, which can be 
guaranteed by `requireAllOutput` and `outputVars != null`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163512277
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan {
 ctx.INPUT_ROW = null
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is 
enabled.
+// 2. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 3. The output variables are not empty. If it's empty, we don't 
bother to do that.
--- End diff --

why this? I feel an operator can still have complex consume method even if 
it doesn't have output.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163511790
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -661,6 +661,14 @@ object SQLConf {
 .intConf
 .createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT)
 
+  val DECOUPLE_OPERATOR_CONSUME_FUNCTIONS = 
buildConf("spark.sql.codegen.decoupleOperatorConsume")
--- End diff --

`decoupleOperatorConsume` looks weird, how about 
`splitConsumeMethodByOperator`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163477806
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -1313,6 +1331,9 @@ object CodeGenerator extends Logging {
   // This is the value of HugeMethodLimit in the OpenJDK JVM settings
   val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000
 
+  // The max valid length of method parameters in JVM.
+  val MAX_JVM_METHOD_PARAMS_LENGTH = 255
--- End diff --

Added `final` to all constants here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163472676
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -1313,6 +1331,9 @@ object CodeGenerator extends Logging {
   // This is the value of HugeMethodLimit in the OpenJDK JVM settings
   val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000
 
+  // The max valid length of method parameters in JVM.
+  val MAX_JVM_METHOD_PARAMS_LENGTH = 255
--- End diff --

make it `final`? I think we can add final to all the constants here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163462731
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 4. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (row == null && outputVars.nonEmpty && requireAllOutput && 
isValidParamLength(ctx)) {
+constructDoConsumeFunction(ctx, inputVars)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
  """.stripMargin
   }
 
+  /**
+   * In Java, a method descriptor is valid only if it represents method 
parameters with a total
+   * length of 255 or less. `this` contributes one unit and a parameter of 
type long or double
+   * contributes two units. Besides, for nullable parameters, we also need 
to pass a boolean
+   * for the null status.
+   */
+  private def isValidParamLength(ctx: CodegenContext): Boolean = {
--- End diff --

Yes. Put it in `CodegenContext`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163462688
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 4. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (row == null && outputVars.nonEmpty && requireAllOutput && 
isValidParamLength(ctx)) {
+constructDoConsumeFunction(ctx, inputVars)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
  """.stripMargin
   }
 
+  /**
+   * In Java, a method descriptor is valid only if it represents method 
parameters with a total
+   * length of 255 or less. `this` contributes one unit and a parameter of 
type long or double
+   * contributes two units. Besides, for nullable parameters, we also need 
to pass a boolean
+   * for the null status.
+   */
+  private def isValidParamLength(ctx: CodegenContext): Boolean = {
+// Start value is 1 for `this`.
+output.foldLeft(1) { case (curLength, attr) =>
+  ctx.javaType(attr.dataType) match {
+case (ctx.JAVA_LONG | ctx.JAVA_DOUBLE) if !attr.nullable => 
curLength + 2
+case ctx.JAVA_LONG | ctx.JAVA_DOUBLE => curLength + 3
+case _ if !attr.nullable => curLength + 1
+case _ => curLength + 2
+  }
+} <= 255
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. We can separate the
+   * parent's `doConsume` codes of a `CodegenSupport` operator into a 
function to call.
+   */
+  private def constructDoConsumeFunction(
+  ctx: CodegenContext,
+  inputVars: Seq[ExprCode]): String = {
+val (callingParams, arguList, inputVarsInFunc) =
+  constructConsumeParameters(ctx, output, inputVars)
+val rowVar = ExprCode("", "false", "unsafeRow")
+val doConsume = ctx.freshName("doConsume")
+val doConsumeFuncName = ctx.addNewFunction(doConsume,
+  s"""
+ | private void $doConsume($arguList) throws java.io.IOException {
+ |   ${parent.doConsume(ctx, inputVarsInFunc, rowVar)}
+ | }
+   """.stripMargin)
+
+s"""
+   | $doConsumeFuncName($callingParams);
+ """.stripMargin
+  }
+
+  /**
+   * Returns source code for calling consume function and the argument 
list of the consume function
+   * and also the `ExprCode` for the argument list.
+   */
+  private def constructConsumeParameters(
+  ctx: CodegenContext,
+  attributes: Seq[Attribute],
+  variables: Seq[ExprCode]): (String, String, Seq[ExprCode]) = {
+val params = variables.zipWithIndex.map { case (ev, i) =>
+  val arguName = ctx.freshName(s"expr_$i")
+  val arguType = ctx.javaType(attributes(i).dataType)
+
+  val (callingParam, funcParams, arguIsNull) = if 
(!attributes(i).nullable) {
+// When the argument is not nullable, we don't need to pass in 
`isNull` param for it and
+// simply give a `false`.
+val arguIsNull = "false"
+(ev.value, s"$arguType $arguName", arguIsNull)
+  } else {
+val arguIsNull = ctx.freshName(s"exprIsNull_$i")
+(ev.value + ", " + ev.isNull, s"$arguType $arguName, boolean 
$arguIsNull", arguIsNull)
+  }
+  (callingParam, funcParams, ExprCode("", arguIsNull, arguName))
+}.unzip3
+(params._1.mkString(", "),
+  params._2.mkString(", "),
+  params._3)
--- End diff --

Done.


---

-
To unsu

[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163462698
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 4. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
--- End diff --

Added a config for it so we can turn it off.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163317809
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 4. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
--- End diff --

or introduce a config so that users can turn it off.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163316730
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 4. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
--- End diff --

Maybe we can be super safe and only do this for certain operators, like 
`HashAggregateExec`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163316509
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 4. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (row == null && outputVars.nonEmpty && requireAllOutput && 
isValidParamLength(ctx)) {
+constructDoConsumeFunction(ctx, inputVars)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
  """.stripMargin
   }
 
+  /**
+   * In Java, a method descriptor is valid only if it represents method 
parameters with a total
+   * length of 255 or less. `this` contributes one unit and a parameter of 
type long or double
+   * contributes two units. Besides, for nullable parameters, we also need 
to pass a boolean
+   * for the null status.
+   */
+  private def isValidParamLength(ctx: CodegenContext): Boolean = {
+// Start value is 1 for `this`.
+output.foldLeft(1) { case (curLength, attr) =>
+  ctx.javaType(attr.dataType) match {
+case (ctx.JAVA_LONG | ctx.JAVA_DOUBLE) if !attr.nullable => 
curLength + 2
+case ctx.JAVA_LONG | ctx.JAVA_DOUBLE => curLength + 3
+case _ if !attr.nullable => curLength + 1
+case _ => curLength + 2
+  }
+} <= 255
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. We can separate the
+   * parent's `doConsume` codes of a `CodegenSupport` operator into a 
function to call.
+   */
+  private def constructDoConsumeFunction(
+  ctx: CodegenContext,
+  inputVars: Seq[ExprCode]): String = {
+val (callingParams, arguList, inputVarsInFunc) =
+  constructConsumeParameters(ctx, output, inputVars)
+val rowVar = ExprCode("", "false", "unsafeRow")
+val doConsume = ctx.freshName("doConsume")
+val doConsumeFuncName = ctx.addNewFunction(doConsume,
+  s"""
+ | private void $doConsume($arguList) throws java.io.IOException {
+ |   ${parent.doConsume(ctx, inputVarsInFunc, rowVar)}
+ | }
+   """.stripMargin)
+
+s"""
+   | $doConsumeFuncName($callingParams);
+ """.stripMargin
+  }
+
+  /**
+   * Returns source code for calling consume function and the argument 
list of the consume function
+   * and also the `ExprCode` for the argument list.
+   */
+  private def constructConsumeParameters(
+  ctx: CodegenContext,
+  attributes: Seq[Attribute],
+  variables: Seq[ExprCode]): (String, String, Seq[ExprCode]) = {
+val params = variables.zipWithIndex.map { case (ev, i) =>
+  val arguName = ctx.freshName(s"expr_$i")
+  val arguType = ctx.javaType(attributes(i).dataType)
+
+  val (callingParam, funcParams, arguIsNull) = if 
(!attributes(i).nullable) {
+// When the argument is not nullable, we don't need to pass in 
`isNull` param for it and
+// simply give a `false`.
+val arguIsNull = "false"
+(ev.value, s"$arguType $arguName", arguIsNull)
+  } else {
+val arguIsNull = ctx.freshName(s"exprIsNull_$i")
+(ev.value + ", " + ev.isNull, s"$arguType $arguName, boolean 
$arguIsNull", arguIsNull)
+  }
+  (callingParam, funcParams, ExprCode("", arguIsNull, arguName))
+}.unzip3
+(params._1.mkString(", "),
+  params._2.mkString(", "),
+  params._3)
--- End diff --

the above 3 lines can be one line?


---

-

[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163315858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 4. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
--- End diff --

My only concern is if we have a bunch of simple operators and we create a 
lot of small methods here. Maybe it's fine as optimizer would prevent such 
cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2018-01-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r163315164
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 4. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (row == null && outputVars.nonEmpty && requireAllOutput && 
isValidParamLength(ctx)) {
+constructDoConsumeFunction(ctx, inputVars)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
  """.stripMargin
   }
 
+  /**
+   * In Java, a method descriptor is valid only if it represents method 
parameters with a total
+   * length of 255 or less. `this` contributes one unit and a parameter of 
type long or double
+   * contributes two units. Besides, for nullable parameters, we also need 
to pass a boolean
+   * for the null status.
+   */
+  private def isValidParamLength(ctx: CodegenContext): Boolean = {
--- End diff --

shall we put it into `CodegenContext` as a util function so that we can use 
it in other places in the future?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-10-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r144279986
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -175,6 +175,25 @@ trait CodegenSupport extends SparkPlan {
   }
 
   /**
+   * In Java, a method descriptor is valid only if it represents method 
parameters with a total
+   * length of 255 or less. `this` contributes one unit and a parameter of 
type long or double
+   * contributes two units. Besides, for nullable parameters, we also need 
to pass a boolean
+   * for the null status.
+   */
+  private def isValidParamLength(ctx: CodegenContext): Boolean = {
+var paramLength = 1 // for `this` parameter.
+output.foreach { attr =>
--- End diff --

Thanks. Will follow it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-10-11 Thread a10y
Github user a10y commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r144025706
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -175,6 +175,25 @@ trait CodegenSupport extends SparkPlan {
   }
 
   /**
+   * In Java, a method descriptor is valid only if it represents method 
parameters with a total
+   * length of 255 or less. `this` contributes one unit and a parameter of 
type long or double
+   * contributes two units. Besides, for nullable parameters, we also need 
to pass a boolean
+   * for the null status.
+   */
+  private def isValidParamLength(ctx: CodegenContext): Boolean = {
+var paramLength = 1 // for `this` parameter.
+output.foreach { attr =>
--- End diff --

(nit: This could be written as a `foldLeft` and then you can eliminate the 
`var`)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-10-06 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r143321347
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 ---
@@ -181,7 +181,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with 
SharedSQLContext {
 val codeWithShortFunctions = genGroupByCode(3)
 val (_, maxCodeSize1) = CodeGenerator.compile(codeWithShortFunctions)
 assert(maxCodeSize1 < 
SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
-val codeWithLongFunctions = genGroupByCode(20)
+val codeWithLongFunctions = genGroupByCode(50)
--- End diff --

In my pr, I changed the code to just check if long functions have the 
larger value of max code size:

https://github.com/apache/spark/pull/19082/files#diff-0314224342bb8c30143ab784b3805d19R185
but, just increasing the value seems better.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-10-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r143321131
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 ---
@@ -181,7 +181,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with 
SharedSQLContext {
 val codeWithShortFunctions = genGroupByCode(3)
 val (_, maxCodeSize1) = CodeGenerator.compile(codeWithShortFunctions)
 assert(maxCodeSize1 < 
SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
-val codeWithLongFunctions = genGroupByCode(20)
+val codeWithLongFunctions = genGroupByCode(50)
--- End diff --

We reduce the length of generated codes. So to make this test work, we 
increase the number of expressions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r140931214
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala ---
@@ -89,6 +89,8 @@ case class ExpandExec(
 child.asInstanceOf[CodegenSupport].inputRDDs()
   }
 
+  override protected def doConsumeInChainOfFunc: Boolean = false
--- End diff --

The good news is, the just merged #19324 simplifies the usage of `continue` 
in codegen. I'm now testing with it if I can remove this tricky part of 
`continue`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-04 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136776281
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
--- End diff --

thanks for the kind explanation!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136746833
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala ---
@@ -89,6 +89,8 @@ case class ExpandExec(
 child.asInstanceOf[CodegenSupport].inputRDDs()
   }
 
+  override protected def doConsumeInChainOfFunc: Boolean = false
--- End diff --

yea, probably we might need to describe more about exceptional cases we 
can't use this optimization like `HashAggregateExec` in 
https://github.com/apache/spark/pull/18931/files#diff-28cb12941b992ff680c277c651b59aa0R204


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136746468
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala ---
@@ -177,6 +177,8 @@ case class SortExec(
  """.stripMargin.trim
   }
 
+  override protected def doConsumeInChainOfFunc: Boolean = false
--- End diff --

yea, other guys might give good suggestions on the naming...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136743435
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala ---
@@ -177,6 +177,8 @@ case class SortExec(
  """.stripMargin.trim
   }
 
+  override protected def doConsumeInChainOfFunc: Boolean = false
--- End diff --

I revised this variable name in times, but didn't find a good name to 
convey its meaning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136742515
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala ---
@@ -89,6 +89,8 @@ case class ExpandExec(
 child.asInstanceOf[CodegenSupport].inputRDDs()
   }
 
+  override protected def doConsumeInChainOfFunc: Boolean = false
--- End diff --

The `doConsume` produces something like:

   |for (int $i = 0; $i < ${projections.length}; $i ++) {
   |  switch ($i) {
   |${cases.mkString("\n").trim}
   |  }
   |  $numOutput.add(1);
   |  ${consume(ctx, outputColumns)}
   |}

So the consume logic of its parent node is actually wrapped in a local 
for-loop. It has the same effect as not chain the next consume.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136742331
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 4. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (row == null && outputVars.nonEmpty && requireAllOutput && 
outputVars.length < 255) {
+constructDoConsumeFunction(ctx, inputVars)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
+ """.stripMargin
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. Instead of inlining,
+   * we may put the consume logic of parent operator into a function and 
set this flag to `true`.
+   * The parent operator can know if its consume logic is inlined or in 
separated function.
+   */
+  private var doConsumeInFunc: Boolean = false
+
+  /**
+   * Returning true means we have at least one consume logic from child 
operator or this operator is
+   * separated in a function. If this is `true`, this operator shouldn't 
use `continue` statement to
+   * continue on next row, because its generated codes aren't enclosed in 
main while-loop.
+   *
+   * For example, we have generated codes for a query plan like:
+   *   Op1Exec
+   * Op2Exec
+   *   Op3Exec
+   *
+   * If we put the consume code of Op2Exec into a separated function, the 
generated codes are like:
+   *   while (...) {
+   * ... // logic of Op3Exec.
+   * Op2Exec_doConsume(...);
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ... // logic of Op2Exec to consume rows.
+   *   }
+   * For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`.
+   *
+   * Notice for some operators like `HashAggregateExec`, it doesn't chain 
previous consume functions
+   * but begins with its produce framework. We should override 
`doConsumeInChainOfFunc` to return
+   * `false`.
+   */
+  protected def doConsumeInChainOfFunc: Boolean = {
+val codegenChildren = children.map(_.asInstanceOf[CodegenSupport])
+doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc)
+  }
+
+  /**
+   * The actual java statement this operator should use if there is a need 
to continue on next row
+   * in its `doConsume` codes.
+   *
+   *   while (...) {
+   * ...   // logic of Op3Exec.
+   * Op2Exec_doConsume(...);
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ...   // logic of Op2Exec to consume rows.
+   * continue; // Wrong. We can't use continue with the while-loop.
+   *   }
+   * In above code, we can't use `continue` in `Op2Exec_doConsume`.
+   *
+   * Instead, we do something like:
+   *   while (...) {
+   * ...  // logic of Op3Exec.
+   * boolean continueForLoop = Op2Exec_doConsume(...);
+   * if (continueForLoop) continue;
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ...  // logic of Op2Exec to consume rows.
+   * return true; // When we need to do continue, we return true.
+   *   }
+   */
+  protected def continueStatementInDoConsume: String = if 
(doConsumeInChainOfFunc) {
+"return true;";
--- End diff --

Thanks. I'll fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct

[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136742282
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 4. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (row == null && outputVars.nonEmpty && requireAllOutput && 
outputVars.length < 255) {
+constructDoConsumeFunction(ctx, inputVars)
--- End diff --

I was thinking to check it. But the whole-stage codegen is a non-breaking 
processing which produce/consume calls are embeded together. You don't have a 
break to check the function length here.

Actually I think it should have no negative effect to split consume 
functions always. From the benchmarking numbers, looks it shows no harm to 
normal queries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136742019
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
--- End diff --

The same reason as above. The variables used to evaluate the row can be out 
of scope because row construction is deferred until it is used actually.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136741637
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
--- End diff --

E.g., a `ProjectExec` node doesn't necessarily evaluate all its output 
variables before continuing `doConsume` of its parent node. It can defer the 
evaluation until the variables are needed in the parent node's consume logic.

Once a variable's evaluation is deferred, and if we create a consume 
function, the variable will be evaluated in the function. But now the 
references of this variable is out of scope.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136711099
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala ---
@@ -177,6 +177,8 @@ case class SortExec(
  """.stripMargin.trim
   }
 
+  override protected def doConsumeInChainOfFunc: Boolean = false
--- End diff --

I think it's better to put the reason why we set false explicitly in each 
plan like 
https://github.com/apache/spark/pull/18931/files#diff-28cb12941b992ff680c277c651b59aa0R204

btw, we can have better naming for this? e.g., `canPipeline`, or something 
because IIUC this optimisation can be applied in `pipelining` operators 
(`pipeling` is one of database terminology: 
https://link.springer.com/referenceworkentry/10.1007%2F978-0-387-39940-9_872).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136710928
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 4. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (row == null && outputVars.nonEmpty && requireAllOutput && 
outputVars.length < 255) {
+constructDoConsumeFunction(ctx, inputVars)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
+ """.stripMargin
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. Instead of inlining,
+   * we may put the consume logic of parent operator into a function and 
set this flag to `true`.
+   * The parent operator can know if its consume logic is inlined or in 
separated function.
+   */
+  private var doConsumeInFunc: Boolean = false
+
+  /**
+   * Returning true means we have at least one consume logic from child 
operator or this operator is
+   * separated in a function. If this is `true`, this operator shouldn't 
use `continue` statement to
+   * continue on next row, because its generated codes aren't enclosed in 
main while-loop.
+   *
+   * For example, we have generated codes for a query plan like:
+   *   Op1Exec
+   * Op2Exec
+   *   Op3Exec
+   *
+   * If we put the consume code of Op2Exec into a separated function, the 
generated codes are like:
+   *   while (...) {
+   * ... // logic of Op3Exec.
+   * Op2Exec_doConsume(...);
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ... // logic of Op2Exec to consume rows.
+   *   }
+   * For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`.
+   *
+   * Notice for some operators like `HashAggregateExec`, it doesn't chain 
previous consume functions
+   * but begins with its produce framework. We should override 
`doConsumeInChainOfFunc` to return
+   * `false`.
+   */
+  protected def doConsumeInChainOfFunc: Boolean = {
+val codegenChildren = children.map(_.asInstanceOf[CodegenSupport])
+doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc)
+  }
+
+  /**
+   * The actual java statement this operator should use if there is a need 
to continue on next row
+   * in its `doConsume` codes.
+   *
+   *   while (...) {
+   * ...   // logic of Op3Exec.
+   * Op2Exec_doConsume(...);
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ...   // logic of Op2Exec to consume rows.
+   * continue; // Wrong. We can't use continue with the while-loop.
+   *   }
+   * In above code, we can't use `continue` in `Op2Exec_doConsume`.
+   *
+   * Instead, we do something like:
+   *   while (...) {
+   * ...  // logic of Op3Exec.
+   * boolean continueForLoop = Op2Exec_doConsume(...);
+   * if (continueForLoop) continue;
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ...  // logic of Op2Exec to consume rows.
+   * return true; // When we need to do continue, we return true.
+   *   }
+   */
+  protected def continueStatementInDoConsume: String = if 
(doConsumeInChainOfFunc) {
+"return true;";
+  } else {
+"continue;"
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. We can separate the
+   * parent's `doConsume` codes of a `CodegenSupport` operator into a 
function to call.
+   */
+  protected def constructDoConsumeFunction

[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136710824
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 4. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (row == null && outputVars.nonEmpty && requireAllOutput && 
outputVars.length < 255) {
+constructDoConsumeFunction(ctx, inputVars)
--- End diff --

We always split consume functions?; we don't need to check if this consume 
function is too long?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136710234
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 4. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (row == null && outputVars.nonEmpty && requireAllOutput && 
outputVars.length < 255) {
+constructDoConsumeFunction(ctx, inputVars)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
+ """.stripMargin
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. Instead of inlining,
+   * we may put the consume logic of parent operator into a function and 
set this flag to `true`.
+   * The parent operator can know if its consume logic is inlined or in 
separated function.
+   */
+  private var doConsumeInFunc: Boolean = false
+
+  /**
+   * Returning true means we have at least one consume logic from child 
operator or this operator is
+   * separated in a function. If this is `true`, this operator shouldn't 
use `continue` statement to
+   * continue on next row, because its generated codes aren't enclosed in 
main while-loop.
+   *
+   * For example, we have generated codes for a query plan like:
+   *   Op1Exec
+   * Op2Exec
+   *   Op3Exec
+   *
+   * If we put the consume code of Op2Exec into a separated function, the 
generated codes are like:
+   *   while (...) {
+   * ... // logic of Op3Exec.
+   * Op2Exec_doConsume(...);
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ... // logic of Op2Exec to consume rows.
+   *   }
+   * For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`.
+   *
+   * Notice for some operators like `HashAggregateExec`, it doesn't chain 
previous consume functions
+   * but begins with its produce framework. We should override 
`doConsumeInChainOfFunc` to return
+   * `false`.
+   */
+  protected def doConsumeInChainOfFunc: Boolean = {
+val codegenChildren = children.map(_.asInstanceOf[CodegenSupport])
+doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc)
+  }
+
+  /**
+   * The actual java statement this operator should use if there is a need 
to continue on next row
+   * in its `doConsume` codes.
+   *
+   *   while (...) {
+   * ...   // logic of Op3Exec.
+   * Op2Exec_doConsume(...);
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ...   // logic of Op2Exec to consume rows.
+   * continue; // Wrong. We can't use continue with the while-loop.
+   *   }
+   * In above code, we can't use `continue` in `Op2Exec_doConsume`.
+   *
+   * Instead, we do something like:
+   *   while (...) {
+   * ...  // logic of Op3Exec.
+   * boolean continueForLoop = Op2Exec_doConsume(...);
+   * if (continueForLoop) continue;
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ...  // logic of Op2Exec to consume rows.
+   * return true; // When we need to do continue, we return true.
+   *   }
+   */
+  protected def continueStatementInDoConsume: String = if 
(doConsumeInChainOfFunc) {
+"return true;";
--- End diff --

Remove `;`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache

[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136710741
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
--- End diff --

ditto; I want to know the concrete example, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136710667
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
--- End diff --

What's the concrete example when this case prevents consume functions from 
being split?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136710909
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 4. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (row == null && outputVars.nonEmpty && requireAllOutput && 
outputVars.length < 255) {
+constructDoConsumeFunction(ctx, inputVars)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
+ """.stripMargin
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. Instead of inlining,
+   * we may put the consume logic of parent operator into a function and 
set this flag to `true`.
+   * The parent operator can know if its consume logic is inlined or in 
separated function.
+   */
+  private var doConsumeInFunc: Boolean = false
+
+  /**
+   * Returning true means we have at least one consume logic from child 
operator or this operator is
+   * separated in a function. If this is `true`, this operator shouldn't 
use `continue` statement to
+   * continue on next row, because its generated codes aren't enclosed in 
main while-loop.
+   *
+   * For example, we have generated codes for a query plan like:
+   *   Op1Exec
+   * Op2Exec
+   *   Op3Exec
+   *
+   * If we put the consume code of Op2Exec into a separated function, the 
generated codes are like:
+   *   while (...) {
+   * ... // logic of Op3Exec.
+   * Op2Exec_doConsume(...);
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ... // logic of Op2Exec to consume rows.
+   *   }
+   * For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`.
+   *
+   * Notice for some operators like `HashAggregateExec`, it doesn't chain 
previous consume functions
+   * but begins with its produce framework. We should override 
`doConsumeInChainOfFunc` to return
+   * `false`.
+   */
+  protected def doConsumeInChainOfFunc: Boolean = {
+val codegenChildren = children.map(_.asInstanceOf[CodegenSupport])
+doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc)
+  }
+
+  /**
+   * The actual java statement this operator should use if there is a need 
to continue on next row
+   * in its `doConsume` codes.
+   *
+   *   while (...) {
+   * ...   // logic of Op3Exec.
+   * Op2Exec_doConsume(...);
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ...   // logic of Op2Exec to consume rows.
+   * continue; // Wrong. We can't use continue with the while-loop.
+   *   }
+   * In above code, we can't use `continue` in `Op2Exec_doConsume`.
+   *
+   * Instead, we do something like:
+   *   while (...) {
+   * ...  // logic of Op3Exec.
+   * boolean continueForLoop = Op2Exec_doConsume(...);
+   * if (continueForLoop) continue;
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ...  // logic of Op2Exec to consume rows.
+   * return true; // When we need to do continue, we return true.
+   *   }
+   */
+  protected def continueStatementInDoConsume: String = if 
(doConsumeInChainOfFunc) {
+"return true;";
+  } else {
+"continue;"
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. We can separate the
+   * parent's `doConsume` codes of a `CodegenSupport` operator into a 
function to call.
+   */
+  protected def constructDoConsumeFunction

[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136711183
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala ---
@@ -89,6 +89,8 @@ case class ExpandExec(
 child.asInstanceOf[CodegenSupport].inputRDDs()
   }
 
+  override protected def doConsumeInChainOfFunc: Boolean = false
--- End diff --

 I might be not 100% sure about your intention though, I feel this is a 
little confusing because `ExpandExec` consume functions can be chained in gen'd 
code, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-09-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136710872
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+// 4. The number of output variables must less than maximum number of 
parameters in Java method
+//declaration.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (row == null && outputVars.nonEmpty && requireAllOutput && 
outputVars.length < 255) {
+constructDoConsumeFunction(ctx, inputVars)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
+ """.stripMargin
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. Instead of inlining,
+   * we may put the consume logic of parent operator into a function and 
set this flag to `true`.
+   * The parent operator can know if its consume logic is inlined or in 
separated function.
+   */
+  private var doConsumeInFunc: Boolean = false
+
+  /**
+   * Returning true means we have at least one consume logic from child 
operator or this operator is
+   * separated in a function. If this is `true`, this operator shouldn't 
use `continue` statement to
+   * continue on next row, because its generated codes aren't enclosed in 
main while-loop.
+   *
+   * For example, we have generated codes for a query plan like:
+   *   Op1Exec
+   * Op2Exec
+   *   Op3Exec
+   *
+   * If we put the consume code of Op2Exec into a separated function, the 
generated codes are like:
+   *   while (...) {
+   * ... // logic of Op3Exec.
+   * Op2Exec_doConsume(...);
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ... // logic of Op2Exec to consume rows.
+   *   }
+   * For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`.
+   *
+   * Notice for some operators like `HashAggregateExec`, it doesn't chain 
previous consume functions
+   * but begins with its produce framework. We should override 
`doConsumeInChainOfFunc` to return
+   * `false`.
+   */
+  protected def doConsumeInChainOfFunc: Boolean = {
+val codegenChildren = children.map(_.asInstanceOf[CodegenSupport])
+doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc)
+  }
+
+  /**
+   * The actual java statement this operator should use if there is a need 
to continue on next row
+   * in its `doConsume` codes.
+   *
+   *   while (...) {
+   * ...   // logic of Op3Exec.
+   * Op2Exec_doConsume(...);
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ...   // logic of Op2Exec to consume rows.
+   * continue; // Wrong. We can't use continue with the while-loop.
+   *   }
+   * In above code, we can't use `continue` in `Op2Exec_doConsume`.
+   *
+   * Instead, we do something like:
+   *   while (...) {
+   * ...  // logic of Op3Exec.
+   * boolean continueForLoop = Op2Exec_doConsume(...);
+   * if (continueForLoop) continue;
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ...  // logic of Op2Exec to consume rows.
+   * return true; // When we need to do continue, we return true.
+   *   }
+   */
+  protected def continueStatementInDoConsume: String = if 
(doConsumeInChainOfFunc) {
+"return true;";
+  } else {
+"continue;"
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. We can separate the
+   * parent's `doConsume` codes of a `CodegenSupport` operator into a 
function to call.
+   */
+  protected def constructDoConsumeFunction

[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-08-31 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136492055
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,14 +149,144 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (row == null && outputVars.nonEmpty && requireAllOutput) {
+constructDoConsumeFunction(ctx, inputVars)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
+ """.stripMargin
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. Instead of inlining,
+   * we may put the consume logic of parent operator into a function and 
set this flag to `true`.
+   * The parent operator can know if its consume logic is inlined or in 
separated function.
+   */
+  private var doConsumeInFunc: Boolean = false
+
+  /**
+   * Returning true means we have at least one consume logic from child 
operator or this operator is
+   * separated in a function. If this is `true`, this operator shouldn't 
use `continue` statement to
+   * continue on next row, because its generated codes aren't enclosed in 
main while-loop.
+   *
+   * For example, we have generated codes for a query plan like:
+   *   Op1Exec
+   * Op2Exec
+   *   Op3Exec
+   *
+   * If we put the consume code of Op2Exec into a separated function, the 
generated codes are like:
+   *   while (...) {
+   * ... // logic of Op3Exec.
+   * Op2Exec_doConsume(...);
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ... // logic of Op2Exec to consume rows.
+   *   }
+   * For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`.
+   *
+   * Notice for some operators like `HashAggregateExec`, it doesn't chain 
previous consume functions
+   * but begins with its produce framework. We should override 
`doConsumeInChainOfFunc` to return
+   * `false`.
+   */
+  protected def doConsumeInChainOfFunc: Boolean = {
+val codegenChildren = children.map(_.asInstanceOf[CodegenSupport])
+doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc)
+  }
+
+  /**
+   * The actual java statement this operator should use if there is a need 
to continue on next row
+   * in its `doConsume` codes.
+   *
+   *   while (...) {
+   * ...   // logic of Op3Exec.
+   * Op2Exec_doConsume(...);
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ...   // logic of Op2Exec to consume rows.
+   * continue; // Wrong. We can't use continue with the while-loop.
+   *   }
+   * In above code, we can't use `continue` in `Op2Exec_doConsume`.
+   *
+   * Instead, we do something like:
+   *   while (...) {
+   * ...  // logic of Op3Exec.
+   * boolean continueForLoop = Op2Exec_doConsume(...);
+   * if (continueForLoop) continue;
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ...  // logic of Op2Exec to consume rows.
+   * return true; // When we need to do continue, we return true.
+   *   }
+   */
+  protected def continueStatementInDoConsume: String = if 
(doConsumeInChainOfFunc) {
+"return true;";
+  } else {
+"continue;"
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. We can separate the
+   * parent's `doConsume` codes of a `CodegenSupport` operator into a 
function to call.
+   */
+  protected def constructDoConsumeFunction(
+  ctx: CodegenContext,
+  inputVars: Seq[ExprCode]): String = {
+val (callingParams, arguList, inputVarsInFunc) =
--- End diff --
 

[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...

2017-08-31 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18931#discussion_r136491920
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -149,14 +149,144 @@ trait CodegenSupport extends SparkPlan {
 
 ctx.freshNamePrefix = parent.variablePrefix
 val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
+
+// Under certain conditions, we can put the logic to consume the rows 
of this operator into
+// another function. So we can prevent a generated function too long 
to be optimized by JIT.
+// The conditions:
+// 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
+//in another function.
+// 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
+// 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
+//can't do it.
+val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+val consumeFunc =
+  if (row == null && outputVars.nonEmpty && requireAllOutput) {
+constructDoConsumeFunction(ctx, inputVars)
+  } else {
+parent.doConsume(ctx, inputVars, rowVar)
+  }
 s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
-   |${parent.doConsume(ctx, inputVars, rowVar)}
+   |$consumeFunc
+ """.stripMargin
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. Instead of inlining,
+   * we may put the consume logic of parent operator into a function and 
set this flag to `true`.
+   * The parent operator can know if its consume logic is inlined or in 
separated function.
+   */
+  private var doConsumeInFunc: Boolean = false
+
+  /**
+   * Returning true means we have at least one consume logic from child 
operator or this operator is
+   * separated in a function. If this is `true`, this operator shouldn't 
use `continue` statement to
+   * continue on next row, because its generated codes aren't enclosed in 
main while-loop.
+   *
+   * For example, we have generated codes for a query plan like:
+   *   Op1Exec
+   * Op2Exec
+   *   Op3Exec
+   *
+   * If we put the consume code of Op2Exec into a separated function, the 
generated codes are like:
+   *   while (...) {
+   * ... // logic of Op3Exec.
+   * Op2Exec_doConsume(...);
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ... // logic of Op2Exec to consume rows.
+   *   }
+   * For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`.
+   *
+   * Notice for some operators like `HashAggregateExec`, it doesn't chain 
previous consume functions
+   * but begins with its produce framework. We should override 
`doConsumeInChainOfFunc` to return
+   * `false`.
+   */
+  protected def doConsumeInChainOfFunc: Boolean = {
+val codegenChildren = children.map(_.asInstanceOf[CodegenSupport])
+doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc)
+  }
+
+  /**
+   * The actual java statement this operator should use if there is a need 
to continue on next row
+   * in its `doConsume` codes.
+   *
+   *   while (...) {
+   * ...   // logic of Op3Exec.
+   * Op2Exec_doConsume(...);
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ...   // logic of Op2Exec to consume rows.
+   * continue; // Wrong. We can't use continue with the while-loop.
+   *   }
+   * In above code, we can't use `continue` in `Op2Exec_doConsume`.
+   *
+   * Instead, we do something like:
+   *   while (...) {
+   * ...  // logic of Op3Exec.
+   * boolean continueForLoop = Op2Exec_doConsume(...);
+   * if (continueForLoop) continue;
+   *   }
+   *   private boolean Op2Exec_doConsume(...) {
+   * ...  // logic of Op2Exec to consume rows.
+   * return true; // When we need to do continue, we return true.
+   *   }
+   */
+  protected def continueStatementInDoConsume: String = if 
(doConsumeInChainOfFunc) {
+"return true;";
+  } else {
+"continue;"
+  }
+
+  /**
+   * To prevent concatenated function growing too long to be optimized by 
JIT. We can separate the
+   * parent's `doConsume` codes of a `CodegenSupport` operator into a 
function to call.
+   */
+  protected def constructDoConsumeFunction(
+  ctx: CodegenContext,
+  inputVars: Seq[ExprCode]): String = {
+val (callingParams, arguList, inputVarsInFunc) =
--- End diff --