sunchao commented on code in PR #56575:
URL: https://github.com/apache/spark/pull/56575#discussion_r3464944332
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -775,6 +775,10 @@ object QueryExecution {
DisableUnnecessaryBucketedScan,
ApplyColumnarRulesAndInsertTransitions(
sparkSession.sessionState.columnarRules, outputsColumnar = false),
+ // Materialize any RuntimeReplaceable that survived the optimizer into
its replacement for
+ // the Spark execution path. After columnar/native conversion (so a
native engine sees the
+ // original expression), before codegen (so Spark codegen never sees a
RuntimeReplaceable).
+ MaterializeRuntimeReplaceable,
Review Comment:
[P2] Expose structural equality before join-key extraction
Materializing at this point is too late for planner consumers. A supported
`eagerReplace = false` binary predicate with `replacement = EqualTo(leftKey,
rightKey)` survives the logical optimizer, but `ExtractEquiJoinKeys` only
pattern-matches visible `EqualTo` / `EqualNullSafe` conjuncts and therefore
extracts no keys from the wrapper. In a focused regression with broadcasting
disabled, the batch plan contained `CartesianProductExec` and no
`SortMergeJoinExec` (while still returning the correct rows), turning a normal
equi-join into O(N*M) work that can OOM. A second control showed that an
ordinary stream-stream `EqualTo` is accepted by `StreamingJoinStrategy`, while
the identical equality hidden in this wrapper throws the 'stream-stream join
without equality predicate' `AnalysisException`.
`MaterializeRuntimeReplaceable` unfolds the condition only after Spark
strategies have selected the operator, so it cannot recover a
hash/sort-merge/streaming equi-join. Before this PR, unconditional replacement
exposed the equality before planning. Could join-key extraction inspect an
unfolded condition (while preserving the original expression where needed), or
should such structural predicates be forced to replace eagerly? Batch
physical-plan and stream-stream planning regressions would cover both
consequences.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala:
##########
@@ -53,7 +53,28 @@ object ReplaceExpressions extends Rule[LogicalPlan] {
}
private def replace(e: Expression): Expression = e match {
- case r: RuntimeReplaceable => replace(r.replacement)
+ // Aggregates can never self-evaluate (no real aggregation buffer), so
always rewrite early.
+ case r: RuntimeReplaceableAggregate => replace(r.replacement)
+
+ case r: RuntimeReplaceable =>
+ val replaced = replace(r.replacement)
+ // By default (`eagerReplace = true`) a `RuntimeReplaceable` is
rewritten here, so it never
+ // reaches the physical plan. An expression with `eagerReplace = false`
is instead allowed to
+ // survive into the physical plan (to be matched by a native engine or
materialized just
+ // before codegen). Even then, a survivor must be rewritten early if its
replacement cannot
+ // survive:
+ // - A non-deterministic replacement (e.g. the `Rand` inside `uniform`)
carries mutable
+ // per-partition state that must be initialized before eval. That
state cannot be reliably
+ // initialized through the `lazy val replacement`, which tree
transforms may re-create.
+ // - A replacement that contains an `Unevaluable` expression (e.g.
`With`) depends on a later
+ // logical rule (such as `RewriteWithExpression`) that can only run
in the logical phase.
+ // `AttributeReference` is `Unevaluable` too but is bound at
execution like any input
+ // column, so it does not block survival -- we reuse
`ConvertToLocalRelation`'s check,
+ // which already excludes it.
+ val cannotSurvive =
+ !replaced.deterministic ||
ConvertToLocalRelation.hasUnevaluableExpr(replaced)
+ if (r.eagerReplace || cannotSurvive) replaced else r
Review Comment:
[P2] Force eager replacement when later finish-analysis rules need a hidden
expression
This gate also lets through replacements that require a later
finish-analysis rewrite. For example, consider a non-`InheritAnalysisRules`
wrapper with an input timestamp child, `eagerReplace = false`, and `replacement
= Greatest(Seq(CurrentTimestamp(), child))`. The replacement is deterministic
and evaluable, and the input attribute makes it non-foldable, so the wrapper
survives here. However, the replacement is not one of the wrapper's children,
so its tree-pattern bits do not expose `CURRENT_LIKE`. `ComputeCurrentTime`,
which runs after `ReplaceExpressions`, prunes this subtree and never replaces
`CurrentTimestamp` with the single query-start literal.
`MaterializeRuntimeReplaceable` exposes it only during physical preparation,
where `CurrentTimestamp.eval` calls `Instant.now()` as rows are evaluated. In a
focused 20,000-row regression, this produced multiple distinct timestamps
within one query.
Before this PR, `ReplaceExpressions` always exposed the replacement before
`ComputeCurrentTime`. Could we force eager replacement at least when
`replaced.containsPattern(CURRENT_LIKE)` (or otherwise run the remaining
finish-analysis rewrites on the hidden replacement)? Please keep the regression
attribute-dependent so constant folding cannot mask the problem.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]