cloud-fan commented on code in PR #56575:
URL: https://github.com/apache/spark/pull/56575#discussion_r3455117913
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -155,6 +155,12 @@ case class AdaptiveSparkPlanExec(
private def postStageCreationRules(outputsColumnar: Boolean) = Seq(
ApplyColumnarRulesAndInsertTransitions(
context.session.sessionState.columnarRules, outputsColumnar),
+ // Mirror `QueryExecution.preparations`: materialize any surviving
`RuntimeReplaceable` after
+ // columnar/native conversion and before codegen. AQE re-optimizes each
stage (which can
+ // re-insert a surviving `RuntimeReplaceable`, e.g. via
`OptimizeCsvJsonExprs`), and
+ // `AdaptiveSparkPlanExec` is a `LeafExecNode` that the outer
`preparations` rule can't reach,
+ // so the materialization must also run here.
+ MaterializeRuntimeReplaceable,
Review Comment:
Follow-up: I generalized this beyond the cached-batch case.
First-principle we landed on: a surviving `RuntimeReplaceable`
(`eagerReplace = false`) must be unfolded to its `replacement` wherever a
predicate **leaves Spark's own expression-evaluation engine** and is handed to
a consumer that interprets its structure or sends it elsewhere — because
`RuntimeReplaceable` is a Spark-internal optimizer concept the consumer can't
understand. Internal Spark evaluation (`FilterExec`/`Project` codegen +
interpreted) needs no unfold: it's handled by the `eval`/`doGenCode` delegation
plus `MaterializeRuntimeReplaceable`.
That boundary is realized at three places, all now unfolding:
- `InMemoryTableScanExec.buildFilter` (cached-batch pruning — the case you
flagged; `CachedBatchSerializer` is a `@DeveloperApi`).
- `DataSourceStrategy.translateLeafNodeFilter` (V1 / file source filter
pushdown).
- `V2ExpressionBuilder.generateExpression` (V2) — and since all V2 pushdown
(filters, aggregate functions, aggregate arguments, group-by, sort) funnels
through `generateExpression` via `PushableExpression`/`translateAggregation`,
this single fallback covers V2 filter **and** aggregate/group-by pushdown. It's
a fallback (after the explicit cases) so native high-level pushdown like
`AES_ENCRYPT` still wins.
Out of scope (different kind of consumer — column *selection*, not
expression translation, and driven by `references` which for a survivor equals
its children's references): nested-column/schema pruning and partition pruning.
Tests added for all three boundaries (V1/V2 filter translation, V2
aggregate/group-by, and an AQE cached-batch pruning test); each was verified to
fail without the corresponding unfold.
Note: I squashed the branch into a single commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]