Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/18931#discussion_r163746605
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
---
@@ -156,13 +162,96 @@ trait CodegenSupport extends SparkPlan {
ctx.INPUT_ROW = null
ctx.freshNamePrefix = parent.variablePrefix
val evaluated = evaluateRequiredVariables(output, inputVars,
parent.usedInputs)
+
+ // Under certain conditions, we can put the logic to consume the rows
of this operator into
+ // another function. So we can prevent a generated function too long
to be optimized by JIT.
+ // The conditions:
+ // 1. The config "spark.sql.codegen.splitConsumeFuncByOperator" is
enabled.
+ // 2. `inputVars` are all materialized. That is guaranteed to be true
if the parent plan uses
+ // all variables in output (see `requireAllOutput`).
+ // 3. The number of output variables must less than maximum number of
parameters in Java method
+ // declaration.
+ val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+ val consumeFunc =
+ if (SQLConf.get.wholeStageSplitConsumeFuncByOperator &&
requireAllOutput &&
+ ctx.isValidParamLength(output)) {
+ constructDoConsumeFunction(ctx, inputVars, row)
+ } else {
+ parent.doConsume(ctx, inputVars, rowVar)
+ }
s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
- |${parent.doConsume(ctx, inputVars, rowVar)}
+ |$consumeFunc
+ """.stripMargin
+ }
+
+ /**
+ * To prevent concatenated function growing too long to be optimized by
JIT. We can separate the
+ * parent's `doConsume` codes of a `CodegenSupport` operator into a
function to call.
+ */
+ private def constructDoConsumeFunction(
+ ctx: CodegenContext,
+ inputVars: Seq[ExprCode],
+ row: String): String = {
+ val (args, params, inputVarsInFunc) = constructConsumeParameters(ctx,
output, inputVars, row)
+ val rowVar = prepareRowVar(ctx, row, inputVarsInFunc)
+
+ val doConsume = ctx.freshName("doConsume")
+ ctx.currentVars = inputVarsInFunc
+ ctx.INPUT_ROW = null
+
+ val doConsumeFuncName = ctx.addNewFunction(doConsume,
+ s"""
+ | private void $doConsume(${params.mkString(", ")}) throws
java.io.IOException {
+ | ${parent.doConsume(ctx, inputVarsInFunc, rowVar)}
+ | }
+ """.stripMargin)
+
+ s"""
+ | $doConsumeFuncName(${args.mkString(", ")});
""".stripMargin
}
+ /**
+ * Returns arguments for calling method and method definition parameters
of the consume function.
+ * And also returns the list of `ExprCode` for the parameters.
+ */
+ private def constructConsumeParameters(
+ ctx: CodegenContext,
+ attributes: Seq[Attribute],
+ variables: Seq[ExprCode],
+ row: String): (Seq[String], Seq[String], Seq[ExprCode]) = {
+ val arguments = mutable.ArrayBuffer[String]()
+ val parameters = mutable.ArrayBuffer[String]()
+ val paramVars = mutable.ArrayBuffer[ExprCode]()
+
+ if (row != null) {
+ arguments += row
--- End diff --
we need to update `ctx.isValidParamLength` to count this
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]