cloud-fan commented on code in PR #56575:
URL: https://github.com/apache/spark/pull/56575#discussion_r3449749454


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala:
##########
@@ -53,7 +53,24 @@ object ReplaceExpressions extends Rule[LogicalPlan] {
   }
 
   private def replace(e: Expression): Expression = e match {
-    case r: RuntimeReplaceable => replace(r.replacement)
+    // Aggregates can never self-evaluate (no real aggregation buffer), so 
always rewrite early.
+    case r: RuntimeReplaceableAggregate => replace(r.replacement)
+
+    case r: RuntimeReplaceable =>
+      val replaced = replace(r.replacement)
+      // By default (`eagerReplace = true`) a `RuntimeReplaceable` is 
rewritten here, so it never
+      // reaches the physical plan. An expression with `eagerReplace = false` 
is instead allowed to
+      // survive into the physical plan (to be matched by a native engine or 
materialized just
+      // before codegen). Even then, a survivor must be rewritten early if its 
replacement cannot
+      // survive:
+      //  - A non-deterministic replacement (e.g. the `Rand` inside `uniform`) 
carries mutable
+      //    per-partition state that must be initialized before eval. That 
state cannot be reliably
+      //    initialized through the `lazy val replacement`, which tree 
transforms may re-create.
+      //  - A replacement that contains an `Unevaluable` expression (e.g. 
`With`) depends on a later
+      //    logical rule (such as `RewriteWithExpression`) that can only run 
in the logical phase.
+      val cannotSurvive = !replaced.deterministic || 
replaced.exists(_.isInstanceOf[Unevaluable])

Review Comment:
   Good catch, you're exactly right. Fixed in b80438b: the survival gate now 
reuses `ConvertToLocalRelation.hasUnevaluableExpr(replaced)`, which already 
excludes `AttributeReference`, so a replacement that reads an input column 
(e.g. `Add(attr, Literal(1))`) can survive instead of being forced down the 
eager path. Added a direct `ReplaceExpressions` regression test in 
`OptimizerSuite` with a column-based wrapper (`ColumnBasedRuntimeReplaceable`) 
that fails without the fix and passes with it.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -775,6 +775,10 @@ object QueryExecution {
       DisableUnnecessaryBucketedScan,
       ApplyColumnarRulesAndInsertTransitions(
         sparkSession.sessionState.columnarRules, outputsColumnar = false),
+      // Materialize any RuntimeReplaceable that survived the optimizer into 
its replacement for
+      // the Spark execution path. After columnar/native conversion (so a 
native engine sees the
+      // original expression), before codegen (so Spark codegen never sees a 
RuntimeReplaceable).
+      MaterializeRuntimeReplaceable,

Review Comment:
   Agreed, this was a real gap. Fixed in b80438b by adding 
`MaterializeRuntimeReplaceable` to AQE's `postStageCreationRules`, right before 
`collapseCodegenStagesRule`, mirroring `QueryExecution.preparations`. I 
confirmed both `newQueryStage` and `newResultQueryStage` apply 
`postStageCreationRules`, so shuffle stages and the final result stage are both 
covered. Added an AQE-path test (`repartition` shuffle) that asserts no 
`RuntimeReplaceable` reaches the AQE `finalPhysicalPlan` and that results are 
correct.
   
   One minor correction to the mechanism: `AQEOptimizer` doesn't actually run 
`OptimizeCsvJsonExprs` or `spark.experimental.extraOptimizations`, so the 
survivor isn't re-inserted during `reOptimize`. Instead it originates in the 
main optimizer and persists into the AQE-managed subtree, which the outer 
`preparations` rule can't reach (since `AdaptiveSparkPlanExec` is a 
`LeafExecNode`). So the conclusion stands -- AQE-side materialization is 
required -- just via a slightly different path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to