Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/18931#discussion_r163516654
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
---
@@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan {
ctx.INPUT_ROW = null
ctx.freshNamePrefix = parent.variablePrefix
val evaluated = evaluateRequiredVariables(output, inputVars,
parent.usedInputs)
+
+ // Under certain conditions, we can put the logic to consume the rows
of this operator into
+ // another function. So we can prevent a generated function too long
to be optimized by JIT.
+ // The conditions:
+ // 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is
enabled.
+ // 2. The parent uses all variables in output. we can't defer variable
evaluation when consume
+ // in another function.
+ // 3. The output variables are not empty. If it's empty, we don't
bother to do that.
+ // 4. We don't use row variable. The construction of row uses deferred
variable evaluation. We
+ // can't do it.
+ // 5. The number of output variables must less than maximum number of
parameters in Java method
+ // declaration.
+ val requireAllOutput = output.forall(parent.usedInputs.contains(_))
+ val consumeFunc =
+ if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null &&
outputVars.nonEmpty &&
+ requireAllOutput && ctx.isValidParamLength(output)) {
+ constructDoConsumeFunction(ctx, inputVars)
+ } else {
+ parent.doConsume(ctx, inputVars, rowVar)
+ }
s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
- |${parent.doConsume(ctx, inputVars, rowVar)}
+ |$consumeFunc
+ """.stripMargin
+ }
+
+ /**
+ * To prevent concatenated function growing too long to be optimized by
JIT. We can separate the
+ * parent's `doConsume` codes of a `CodegenSupport` operator into a
function to call.
+ */
+ private def constructDoConsumeFunction(
+ ctx: CodegenContext,
+ inputVars: Seq[ExprCode]): String = {
+ val (callingParams, arguList, inputVarsInFunc) =
+ constructConsumeParameters(ctx, output, inputVars)
+
+ // Set up rowVar because parent plan can possibly consume UnsafeRow
instead of variables.
+ val colExprs = output.zipWithIndex.map { case (attr, i) =>
+ BoundReference(i, attr.dataType, attr.nullable)
+ }
+ // Don't need to copy the variables because they're already evaluated
before entering function.
+ ctx.INPUT_ROW = null
+ ctx.currentVars = inputVarsInFunc
+ val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false)
+ val rowVar = ExprCode(ev.code.trim, "false", ev.value)
+
+ val doConsume = ctx.freshName("doConsume")
--- End diff --
shall we put the operator name in this function name?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]