cloud-fan commented on code in PR #56575:
URL: https://github.com/apache/spark/pull/56575#discussion_r3449749454
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala:
##########
@@ -53,7 +53,24 @@ object ReplaceExpressions extends Rule[LogicalPlan] {
}
private def replace(e: Expression): Expression = e match {
- case r: RuntimeReplaceable => replace(r.replacement)
+ // Aggregates can never self-evaluate (no real aggregation buffer), so
always rewrite early.
+ case r: RuntimeReplaceableAggregate => replace(r.replacement)
+
+ case r: RuntimeReplaceable =>
+ val replaced = replace(r.replacement)
+ // By default (`eagerReplace = true`) a `RuntimeReplaceable` is
rewritten here, so it never
+ // reaches the physical plan. An expression with `eagerReplace = false`
is instead allowed to
+ // survive into the physical plan (to be matched by a native engine or
materialized just
+ // before codegen). Even then, a survivor must be rewritten early if its
replacement cannot
+ // survive:
+ // - A non-deterministic replacement (e.g. the `Rand` inside `uniform`)
carries mutable
+ // per-partition state that must be initialized before eval. That
state cannot be reliably
+ // initialized through the `lazy val replacement`, which tree
transforms may re-create.
+ // - A replacement that contains an `Unevaluable` expression (e.g.
`With`) depends on a later
+ // logical rule (such as `RewriteWithExpression`) that can only run
in the logical phase.
+ val cannotSurvive = !replaced.deterministic ||
replaced.exists(_.isInstanceOf[Unevaluable])
Review Comment:
Good catch, you're exactly right. Fixed in b80438b: the survival gate now
reuses `ConvertToLocalRelation.hasUnevaluableExpr(replaced)`, which already
excludes `AttributeReference`, so a replacement that reads an input column
(e.g. `Add(attr, Literal(1))`) can survive instead of being forced down the
eager path. Added a direct `ReplaceExpressions` regression test in
`OptimizerSuite` with a column-based wrapper (`ColumnBasedRuntimeReplaceable`)
that fails without the fix and passes with it.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -775,6 +775,10 @@ object QueryExecution {
DisableUnnecessaryBucketedScan,
ApplyColumnarRulesAndInsertTransitions(
sparkSession.sessionState.columnarRules, outputsColumnar = false),
+ // Materialize any RuntimeReplaceable that survived the optimizer into
its replacement for
+ // the Spark execution path. After columnar/native conversion (so a
native engine sees the
+ // original expression), before codegen (so Spark codegen never sees a
RuntimeReplaceable).
+ MaterializeRuntimeReplaceable,
Review Comment:
Agreed, this was a real gap. Fixed in b80438b by adding
`MaterializeRuntimeReplaceable` to AQE's `postStageCreationRules`, right before
`collapseCodegenStagesRule`, mirroring `QueryExecution.preparations`. I
confirmed both `newQueryStage` and `newResultQueryStage` apply
`postStageCreationRules`, so shuffle stages and the final result stage are both
covered. Added an AQE-path test (`repartition` shuffle) that asserts no
`RuntimeReplaceable` reaches the AQE `finalPhysicalPlan` and that results are
correct.
One minor correction to the mechanism: `AQEOptimizer` doesn't actually run
`OptimizeCsvJsonExprs` or `spark.experimental.extraOptimizations`, so the
survivor isn't re-inserted during `reOptimize`. Instead it originates in the
main optimizer and persists into the AQE-managed subtree, which the outer
`preparations` rule can't reach (since `AdaptiveSparkPlanExec` is a
`LeafExecNode`). So the conclusion stands -- AQE-side materialization is
required -- just via a slightly different path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]