sunchao commented on code in PR #56575:
URL: https://github.com/apache/spark/pull/56575#discussion_r3477871214
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala:
##########
@@ -190,30 +191,47 @@ object ExtractEquiJoinKeys extends Logging with
PredicateHelper {
// Find equi-join predicates that can be evaluated before the join, and
thus can be used
// as join keys.
val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil)
Review Comment:
[P2] Split the unfolded condition for compound replacements
The condition is split while the outer `RuntimeReplaceable` is still opaque.
If a supported survivor has `replacement = And(EqualTo(left.k, right.k),
GreaterThan(left.v, 0))`, `splitConjunctivePredicates` produces the wrapper as
one predicate; `unfoldKeyPredicate` then yields an `And`, which none of the
top-level equality cases match. A focused regression still planned this as
`CartesianProductExec`, and the equivalent stream-stream join is still rejected
as lacking an equality predicate. This compound replacement shape is already
treated as supported by `TestCompoundPredicateRuntimeReplaceable` in the V2
tests.
Could we expose the replacement before splitting (while retaining the
mapping to the original survivor where needed), and add batch plus streaming
coverage for a compound replacement?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala:
##########
@@ -190,30 +191,47 @@ object ExtractEquiJoinKeys extends Logging with
PredicateHelper {
// Find equi-join predicates that can be evaluated before the join, and
thus can be used
// as join keys.
val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil)
- val joinKeys = predicates.flatMap {
- case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty =>
None
- case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) =>
Some((l, r))
- case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) =>
Some((r, l))
- // Replace null with default value for joining key, then those rows
with null in it could
- // be joined together
- case EqualNullSafe(l, r) if canEvaluate(l, left) && canEvaluate(r,
right) =>
- Seq((Coalesce(Seq(l, Literal.default(l.dataType))),
- Coalesce(Seq(r, Literal.default(r.dataType)))),
- (IsNull(l), IsNull(r))
- ) // (coalesce(l, default) = coalesce(r, default)) and (isnull(l) =
isnull(r))
- case EqualNullSafe(l, r) if canEvaluate(l, right) && canEvaluate(r,
left) =>
- Seq((Coalesce(Seq(r, Literal.default(r.dataType))),
- Coalesce(Seq(l, Literal.default(l.dataType)))),
- (IsNull(r), IsNull(l))
- ) // Same as above with left/right reversed.
- case _ => None
+ // A surviving `RuntimeReplaceable` (`eagerReplace = false`) hides its
`replacement`
+ // from this structural matcher: an equi-join condition expressed as
such a survivor's
+ // `replacement` (e.g. a wrapper whose `replacement` is
`EqualTo(leftKey, rightKey)`)
+ // would not be recognized here, degrading a batch join to a
`CartesianProductExec` and
+ // failing stream-stream join analysis with "without equality
predicate". Unfold a
+ // predicate's `RuntimeReplaceable` before matching so key extraction
sees the underlying
+ // equality. Only the extracted keys use the unfolded form; non-key
predicates are kept
+ // in their original (survivor) form in `otherPredicates` below, so
+ // `MaterializeRuntimeReplaceable` still materializes them before
codegen and EXPLAIN
+ // keeps showing the readable node. The unfold is gated on the tree
pattern so the common
+ // survivor-free join is unaffected.
+ def unfoldKeyPredicate(p: Expression): Expression =
+ if (p.containsPattern(RUNTIME_REPLACEABLE))
RuntimeReplaceable.unfold(p) else p
Review Comment:
[P2] Preserve nested survivors when exposing the outer equality
This recursively unfolds every `RuntimeReplaceable` in the predicate,
including a survivor that is already a visible join-key operand. For
`EqualTo(partitionKey, Survivor(dimensionKey))`, the old extractor retained
`Survivor(dimensionKey)` as the physical key; this code emits only its
low-level replacement, so native planning can no longer match the high-level
key.
It also makes DPP invalid: `ExtractEquiJoinKeys` returns the unfolded build
key, but `PartitionPruning` inspects the original equality and calls
`joinKeys.indexOf(Survivor(dimensionKey))`, producing `-1`. The resulting
`DynamicPruningSubquery` is unresolved and fails default plan validation; with
validation disabled, the negative index later reaches `buildKeys(-1)`. In a
focused planning regression, the visible survivor key was already reduced to
`(k + 0)` before columnar/native conversion.
Could this unfold only the outer survivor needed to reveal an equality,
preserving already-visible key operands, with a DPP regression for this shape?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala:
##########
@@ -53,7 +53,40 @@ object ReplaceExpressions extends Rule[LogicalPlan] {
}
private def replace(e: Expression): Expression = e match {
- case r: RuntimeReplaceable => replace(r.replacement)
+ // Aggregates can never self-evaluate (no real aggregation buffer), so
always rewrite early.
+ case r: RuntimeReplaceableAggregate => replace(r.replacement)
+
+ case r: RuntimeReplaceable =>
+ val replaced = replace(r.replacement)
+ // By default (`eagerReplace = true`) a `RuntimeReplaceable` is
rewritten here, so it never
+ // reaches the physical plan. An expression with `eagerReplace = false`
is instead allowed to
+ // survive into the physical plan (to be matched by a native engine or
materialized just
+ // before codegen). Even then, a survivor must be rewritten early if its
replacement cannot
+ // survive:
+ // - A non-deterministic replacement (e.g. the `Rand` inside `uniform`)
carries mutable
+ // per-partition state that must be initialized before eval. That
state cannot be reliably
+ // initialized through the `lazy val replacement`, which tree
transforms may re-create.
+ // - A replacement that contains an `Unevaluable` expression (e.g.
`With`) depends on a later
+ // logical rule (such as `RewriteWithExpression`) that can only run
in the logical phase.
+ // `AttributeReference` is `Unevaluable` too but is bound at
execution like any input
+ // column, so it does not block survival -- we reuse
`ConvertToLocalRelation`'s check,
+ // which already excludes it.
+ // - A replacement that contains a `CURRENT_LIKE` expression (e.g.
`CurrentTimestamp`,
+ // `CurrentDate`, `Now`, `CurrentDatabase`) depends on the
finish-analysis rules
+ // `ComputeCurrentTime` / `ReplaceCurrentLike`, which run right after
this rule and
+ // fold each such node into a single query-start literal so the query
observes one
+ // consistent value. Those rules pattern-match the `CURRENT_LIKE`
tree pattern, but a
+ // survivor exposes only the `RUNTIME_REPLACEABLE` pattern of its
`children` (the
+ // original arguments), not the pattern bits of its hidden
`replacement`. The node
+ // would therefore be missed and later evaluated per-row
(`CurrentTimestamp.eval`
+ // calls `Instant.now()`), yielding multiple distinct values within
one query. Force
+ // eager replacement so the rules can still see and fold it.
+ val cannotSurvive =
+ !replaced.deterministic ||
+ ConvertToLocalRelation.hasUnevaluableExpr(replaced) ||
+ replaced.containsPattern(CURRENT_LIKE)
Review Comment:
[P2] Cover every required finish-analysis rewrite
This only forces eager replacement for `CURRENT_LIKE`, but the same survivor
opacity still bypasses other mandatory rules that run after
`ReplaceExpressions`. For example, a deterministic, non-foldable wrapper with
`replacement = ArrayDistinct(arrayDoubleAttr)` hides `ARRAY_DISTINCT` from both
`NormalizeFloatingNumbers` passes; in a focused regression, `[-0.0d, 0.0d]`
remained two elements instead of one canonical positive zero. Likewise, a
wrapper whose replacement contains `Cast(Literal("epoch"), DateType, ...)`
hides `CAST` from `SpecialDatetimeValues`; it returned `NULL` in legacy mode
(and would throw in ANSI mode) instead of `1970-01-01`.
`MaterializeRuntimeReplaceable` exposes these expressions only after logical
optimization, so neither rewrite gets another chance. Could the survival gate
cover every replacement requiring a mandatory post-`ReplaceExpressions`
rewrite, or run those rewrites over the hidden replacement, with regressions
for these two cases? A `CURRENT_LIKE`-only guard still permits wrong results
and runtime errors.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]