sunchao commented on code in PR #56575:
URL: https://github.com/apache/spark/pull/56575#discussion_r3449491009


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala:
##########
@@ -53,7 +53,24 @@ object ReplaceExpressions extends Rule[LogicalPlan] {
   }
 
   private def replace(e: Expression): Expression = e match {
-    case r: RuntimeReplaceable => replace(r.replacement)
+    // Aggregates can never self-evaluate (no real aggregation buffer), so 
always rewrite early.
+    case r: RuntimeReplaceableAggregate => replace(r.replacement)
+
+    case r: RuntimeReplaceable =>
+      val replaced = replace(r.replacement)
+      // By default (`eagerReplace = true`) a `RuntimeReplaceable` is 
rewritten here, so it never
+      // reaches the physical plan. An expression with `eagerReplace = false` 
is instead allowed to
+      // survive into the physical plan (to be matched by a native engine or 
materialized just
+      // before codegen). Even then, a survivor must be rewritten early if its 
replacement cannot
+      // survive:
+      //  - A non-deterministic replacement (e.g. the `Rand` inside `uniform`) 
carries mutable
+      //    per-partition state that must be initialized before eval. That 
state cannot be reliably
+      //    initialized through the `lazy val replacement`, which tree 
transforms may re-create.
+      //  - A replacement that contains an `Unevaluable` expression (e.g. 
`With`) depends on a later
+      //    logical rule (such as `RewriteWithExpression`) that can only run 
in the logical phase.
+      val cannotSurvive = !replaced.deterministic || 
replaced.exists(_.isInstanceOf[Unevaluable])

Review Comment:
   [P1] `AttributeReference` extends `Unevaluable`, so this condition is true 
for every replacement that reads a resolved input column. For example, an 
`eagerReplace = false` wrapper whose replacement is `Add(attr, Literal(1L))` is 
still returned as `replaced` below and never reaches physical/native planning. 
In practice, the opt-out only works for constants or nodes inserted after 
`ReplaceExpressions`—which is why both `MultiGetJsonObject` and the new test 
avoid this path.
   
   Could we exclude `AttributeReference` from this check (as 
`ConvertToLocalRelation.hasUnevaluableExpr` already does) and add a direct 
`ReplaceExpressions` regression test with a column-based wrapper?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to