sunchao commented on code in PR #56575:
URL: https://github.com/apache/spark/pull/56575#discussion_r3477871214


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala:
##########
@@ -190,30 +191,47 @@ object ExtractEquiJoinKeys extends Logging with 
PredicateHelper {
       // Find equi-join predicates that can be evaluated before the join, and 
thus can be used
       // as join keys.
       val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil)

Review Comment:
   [P2] Split the unfolded condition for compound replacements
   
   The condition is split while the outer `RuntimeReplaceable` is still opaque. 
If a supported survivor has `replacement = And(EqualTo(left.k, right.k), 
GreaterThan(left.v, 0))`, `splitConjunctivePredicates` produces the wrapper as 
one predicate; `unfoldKeyPredicate` then yields an `And`, which none of the 
top-level equality cases match. A focused regression still planned this as 
`CartesianProductExec`, and the equivalent stream-stream join is still rejected 
as lacking an equality predicate. This compound replacement shape is already 
treated as supported by `TestCompoundPredicateRuntimeReplaceable` in the V2 
tests.
   
   Could we expose the replacement before splitting (while retaining the 
mapping to the original survivor where needed), and add batch plus streaming 
coverage for a compound replacement?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala:
##########
@@ -190,30 +191,47 @@ object ExtractEquiJoinKeys extends Logging with 
PredicateHelper {
       // Find equi-join predicates that can be evaluated before the join, and 
thus can be used
       // as join keys.
       val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil)
-      val joinKeys = predicates.flatMap {
-        case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => 
None
-        case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => 
Some((l, r))
-        case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => 
Some((r, l))
-        // Replace null with default value for joining key, then those rows 
with null in it could
-        // be joined together
-        case EqualNullSafe(l, r) if canEvaluate(l, left) && canEvaluate(r, 
right) =>
-          Seq((Coalesce(Seq(l, Literal.default(l.dataType))),
-            Coalesce(Seq(r, Literal.default(r.dataType)))),
-            (IsNull(l), IsNull(r))
-          )  // (coalesce(l, default) = coalesce(r, default)) and (isnull(l) = 
isnull(r))
-        case EqualNullSafe(l, r) if canEvaluate(l, right) && canEvaluate(r, 
left) =>
-          Seq((Coalesce(Seq(r, Literal.default(r.dataType))),
-            Coalesce(Seq(l, Literal.default(l.dataType)))),
-            (IsNull(r), IsNull(l))
-          )  // Same as above with left/right reversed.
-        case _ => None
+      // A surviving `RuntimeReplaceable` (`eagerReplace = false`) hides its 
`replacement`
+      // from this structural matcher: an equi-join condition expressed as 
such a survivor's
+      // `replacement` (e.g. a wrapper whose `replacement` is 
`EqualTo(leftKey, rightKey)`)
+      // would not be recognized here, degrading a batch join to a 
`CartesianProductExec` and
+      // failing stream-stream join analysis with "without equality 
predicate". Unfold a
+      // predicate's `RuntimeReplaceable` before matching so key extraction 
sees the underlying
+      // equality. Only the extracted keys use the unfolded form; non-key 
predicates are kept
+      // in their original (survivor) form in `otherPredicates` below, so
+      // `MaterializeRuntimeReplaceable` still materializes them before 
codegen and EXPLAIN
+      // keeps showing the readable node. The unfold is gated on the tree 
pattern so the common
+      // survivor-free join is unaffected.
+      def unfoldKeyPredicate(p: Expression): Expression =
+        if (p.containsPattern(RUNTIME_REPLACEABLE)) 
RuntimeReplaceable.unfold(p) else p

Review Comment:
   [P2] Preserve nested survivors when exposing the outer equality
   
   This recursively unfolds every `RuntimeReplaceable` in the predicate, 
including a survivor that is already a visible join-key operand. For 
`EqualTo(partitionKey, Survivor(dimensionKey))`, the old extractor retained 
`Survivor(dimensionKey)` as the physical key; this code emits only its 
low-level replacement, so native planning can no longer match the high-level 
key.
   
   It also makes DPP invalid: `ExtractEquiJoinKeys` returns the unfolded build 
key, but `PartitionPruning` inspects the original equality and calls 
`joinKeys.indexOf(Survivor(dimensionKey))`, producing `-1`. The resulting 
`DynamicPruningSubquery` is unresolved and fails default plan validation; with 
validation disabled, the negative index later reaches `buildKeys(-1)`. In a 
focused planning regression, the visible survivor key was already reduced to 
`(k + 0)` before columnar/native conversion.
   
   Could this unfold only the outer survivor needed to reveal an equality, 
preserving already-visible key operands, with a DPP regression for this shape?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala:
##########
@@ -53,7 +53,40 @@ object ReplaceExpressions extends Rule[LogicalPlan] {
   }
 
   private def replace(e: Expression): Expression = e match {
-    case r: RuntimeReplaceable => replace(r.replacement)
+    // Aggregates can never self-evaluate (no real aggregation buffer), so 
always rewrite early.
+    case r: RuntimeReplaceableAggregate => replace(r.replacement)
+
+    case r: RuntimeReplaceable =>
+      val replaced = replace(r.replacement)
+      // By default (`eagerReplace = true`) a `RuntimeReplaceable` is 
rewritten here, so it never
+      // reaches the physical plan. An expression with `eagerReplace = false` 
is instead allowed to
+      // survive into the physical plan (to be matched by a native engine or 
materialized just
+      // before codegen). Even then, a survivor must be rewritten early if its 
replacement cannot
+      // survive:
+      //  - A non-deterministic replacement (e.g. the `Rand` inside `uniform`) 
carries mutable
+      //    per-partition state that must be initialized before eval. That 
state cannot be reliably
+      //    initialized through the `lazy val replacement`, which tree 
transforms may re-create.
+      //  - A replacement that contains an `Unevaluable` expression (e.g. 
`With`) depends on a later
+      //    logical rule (such as `RewriteWithExpression`) that can only run 
in the logical phase.
+      //    `AttributeReference` is `Unevaluable` too but is bound at 
execution like any input
+      //    column, so it does not block survival -- we reuse 
`ConvertToLocalRelation`'s check,
+      //    which already excludes it.
+      //  - A replacement that contains a `CURRENT_LIKE` expression (e.g. 
`CurrentTimestamp`,
+      //    `CurrentDate`, `Now`, `CurrentDatabase`) depends on the 
finish-analysis rules
+      //    `ComputeCurrentTime` / `ReplaceCurrentLike`, which run right after 
this rule and
+      //    fold each such node into a single query-start literal so the query 
observes one
+      //    consistent value. Those rules pattern-match the `CURRENT_LIKE` 
tree pattern, but a
+      //    survivor exposes only the `RUNTIME_REPLACEABLE` pattern of its 
`children` (the
+      //    original arguments), not the pattern bits of its hidden 
`replacement`. The node
+      //    would therefore be missed and later evaluated per-row 
(`CurrentTimestamp.eval`
+      //    calls `Instant.now()`), yielding multiple distinct values within 
one query. Force
+      //    eager replacement so the rules can still see and fold it.
+      val cannotSurvive =
+        !replaced.deterministic ||
+          ConvertToLocalRelation.hasUnevaluableExpr(replaced) ||
+          replaced.containsPattern(CURRENT_LIKE)

Review Comment:
   [P2] Cover every required finish-analysis rewrite
   
   This only forces eager replacement for `CURRENT_LIKE`, but the same survivor 
opacity still bypasses other mandatory rules that run after 
`ReplaceExpressions`. For example, a deterministic, non-foldable wrapper with 
`replacement = ArrayDistinct(arrayDoubleAttr)` hides `ARRAY_DISTINCT` from both 
`NormalizeFloatingNumbers` passes; in a focused regression, `[-0.0d, 0.0d]` 
remained two elements instead of one canonical positive zero. Likewise, a 
wrapper whose replacement contains `Cast(Literal("epoch"), DateType, ...)` 
hides `CAST` from `SpecialDatetimeValues`; it returned `NULL` in legacy mode 
(and would throw in ANSI mode) instead of `1970-01-01`.
   
   `MaterializeRuntimeReplaceable` exposes these expressions only after logical 
optimization, so neither rewrite gets another chance. Could the survival gate 
cover every replacement requiring a mandatory post-`ReplaceExpressions` 
rewrite, or run those rewrites over the hidden replacement, with regressions 
for these two cases? A `CURRENT_LIKE`-only guard still permits wrong results 
and runtime errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to