HyukjinKwon commented on code in PR #56575:
URL: https://github.com/apache/spark/pull/56575#discussion_r3449221529


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala:
##########
@@ -78,8 +77,23 @@ object NormalizePlan extends PredicateHelper {
    * [[InheritAnalysisRules]] is the replacement expression, the original 
expression will be lost
    * and timezone will never be applied. This causes inconsistencies, because 
fixed-point semantic
    * is to ALWAYS apply timezone, regardless of whether the Cast actually 
needs it.
+   *
+   * Note: this unconditionally unfolds every [[RuntimeReplaceable]] into its 
`replacement`. It
+   * intentionally does NOT reuse the `ReplaceExpressions` optimizer rule, 
which now keeps
+   * deterministic, evaluable [[RuntimeReplaceable]] nodes in the plan (they 
are materialized later,
+   * before codegen). Normalization still needs the fully-unfolded form so 
that the non-child
+   * `parameters` of [[InheritAnalysisRules]] (e.g. the original, un-timezoned 
cast) are dropped.
    */
-  def normalizeRuntimeReplaceable(plan: LogicalPlan): LogicalPlan = 
ReplaceExpressions(plan)
+  def normalizeRuntimeReplaceable(plan: LogicalPlan): LogicalPlan = {
+    plan.transformWithPruning(_.containsAnyPattern(RUNTIME_REPLACEABLE)) {
+      case p => p.mapExpressions(replaceRuntimeReplaceable)
+    }
+  }
+
+  private def replaceRuntimeReplaceable(e: Expression): Expression = e match {
+    case r: RuntimeReplaceable => replaceRuntimeReplaceable(r.replacement)

Review Comment:
   Nit / non-blocking: this full-unfold recursion (`case r: RuntimeReplaceable 
=> replaceRuntimeReplaceable(r.replacement)` / `case _ => e.mapChildren(...)`) 
is identical to `MaterializeRuntimeReplaceable.replace`. Two copies of the same 
unfold can drift over time -- might be worth a small shared helper.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -775,6 +775,10 @@ object QueryExecution {
       DisableUnnecessaryBucketedScan,
       ApplyColumnarRulesAndInsertTransitions(
         sparkSession.sessionState.columnarRules, outputsColumnar = false),
+      // Materialize any RuntimeReplaceable that survived the optimizer into 
its replacement for
+      // the Spark execution path. After columnar/native conversion (so a 
native engine sees the
+      // original expression), before codegen (so Spark codegen never sees a 
RuntimeReplaceable).
+      MaterializeRuntimeReplaceable,

Review Comment:
   `MaterializeRuntimeReplaceable` is added here, to 
`QueryExecution.preparations`, but I don't see it in any of AQE's stage-prep 
lists (`queryStagePreparationRules`, `queryStageOptimizerRules`, 
`postStageCreationRules` in `AdaptiveSparkPlanExec`). Under AQE (default-on) 
that looks like a gap:
   
   - `InsertAdaptiveSparkPlan` runs first in `preparations`, so the plan is 
wrapped in `AdaptiveSparkPlanExec` (a `LeafExecNode`) before this rule runs -- 
`transformUpWithSubqueries` can't reach the AQE-managed subtree.
   - AQE's `reOptimize` re-runs the full optimizer (`optimizer.execute`), which 
includes `OptimizeCsvJsonExprs`, so the surviving `MultiGetJsonObject` 
(`eagerReplace = false`) is re-inserted on the re-optimized stage.
   - AQE's `postStageCreationRules` then runs 
`ApplyColumnarRulesAndInsertTransitions` -> `CollapseCodegenStages` with no 
materialization in between.
   
   So under AQE a `RuntimeReplaceable` reaches whole-stage codegen 
un-materialized. Results stay correct because `eval`/`doGenCode` now delegate 
to `replacement`, but the invariant this PR relies on ("Spark whole-stage 
codegen never sees a `RuntimeReplaceable`", and "only the physical plan shows 
its `Invoke` replacement") wouldn't hold under AQE -- which affects EXPLAIN and 
the per-operator-metrics rationale. The peer prep rules 
(`CollapseCodegenStages`, `ApplyColumnarRulesAndInsertTransitions`, 
`EnsureRequirements`, ...) are all present in both the non-AQE and AQE lists; 
this rule is the only one that isn't.
   
   Would adding `MaterializeRuntimeReplaceable` to AQE's 
`postStageCreationRules` (just before `collapseCodegenStagesRule`) be the right 
fix? The two new tests use no-shuffle queries that aren't AQE-wrapped, so an 
AQE-path (shuffle) test that inspects the AQE final plan would cover this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to