cloud-fan commented on code in PR #55858:
URL: https://github.com/apache/spark/pull/55858#discussion_r3462993872
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala:
##########
@@ -1345,4 +1345,64 @@ class AnalysisErrorSuite extends AnalysisTest with
DataTypeErrorsBase {
"INVALID_NON_DETERMINISTIC_EXPRESSIONS" :: Nil
)
}
+
+ test("SPARK-56729: ReplaceData and WriteDelta allow non-deterministic
expressions") {
Review Comment:
This test only reads the boolean getter — it builds the nodes with a
deterministic `Literal(true)` condition and a never-resolved `dummyQuery`, then
asserts `allowNonDeterministicExpression`. It never constructs a
non-deterministic expression, never runs `CheckAnalysis`, and never exercises
the runtime group-filter or an end-to-end MERGE. So it proves neither the fix
(analysis now passes for a non-deterministic-source MERGE) nor that execution
is correct (the group-filter concern in the `v2Commands.scala:373` thread).
An end-to-end test would be much stronger: a `MERGE INTO … USING
(<non-deterministic source>) …` over an `InMemoryRowLevelOperationTable` that
(a) reproduces the original `INVALID_NON_DETERMINISTIC_EXPRESSIONS` failure
this PR fixes, and (b) runs with `runtimeRowLevelOperationGroupFilterEnabled`
and checks the result rows are correct.
Two smaller points: the negative case covers DELETE but not UPDATE (also
returns `false`); and `AnalysisErrorSuite` exists to assert analysis *errors*,
so a getter-only no-error test is an odd fit — a row-level-operation suite
would be a more natural home.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -367,7 +367,10 @@ case class ReplaceData(
originalTable: NamedRelation,
projections: ReplaceDataProjections,
groupFilterCondition: Option[Expression] = None,
- write: Option[Write] = None) extends RowLevelWrite {
+ write: Option[Write] = None)
+ extends RowLevelWrite with SupportsNonDeterministicExpression {
+
+ override def allowNonDeterministicExpression: Boolean = operation.command()
== MERGE
Review Comment:
**Blocking (correctness — needs validation).** Implementing
`SupportsNonDeterministicExpression` only suppresses the `CheckAnalysis` error;
it doesn't make the operator's *execution* correct with a non-deterministic
expression. The common safe approach is to pull non-deterministic exprs into an
underlying `Project` (evaluated once) — which is exactly what the UPDATE
rewrite does for its assignments, and why UPDATE needs no trait. This PR
instead lets the node carry the expression, so execution has to be validated.
The only expression this relaxation can permit on a MERGE node is
`groupFilterCondition` (the `condition` is validated deterministic by
`RewriteMergeIntoTable.checkMergeIntoCondition`). And `groupFilterCondition` is
consumed by `RowLevelOperationRuntimeGroupFiltering`, which builds a **separate
dynamic-pruning subquery that re-scans the MERGE source**
(`buildMatchingRowsPlan` → `Filter(cond, relation)` → `Aggregate(buildKeys,
…)`) to decide which target groups the main merge reads — with no
`.deterministic` guard (`canInjectGroupFilters` checks only `cond !=
TrueLiteral` and `filterAttributes.nonEmpty`). Non-deterministic plans aren't
reuse-deduplicated, so the source is evaluated **twice independently**: once in
the main merge join, once in the pruning subquery.
- Safe case: non-determinism confined to non-key source columns — the
pruning `Aggregate` projects only the join keys, so the pruned set doesn't
depend on it. The old conservative rejection was a false positive here.
- Unsafe case: non-determinism that affects which rows/keys the source
produces, e.g. `MERGE INTO t USING (SELECT … FROM s WHERE rand() < 0.5) src ON
t.id = src.id …`. The pruning scan and the main merge scan evaluate `rand()`
independently; a key the main merge would match but the pruning scan drops →
that target group is pruned (not read) → the update is silently lost.
The gate is unconditional for all of MERGE, so it admits the unsafe case.
Could you validate this path? Options: prove the double-evaluation can't change
results (e.g. only deterministic key columns ever drive the group filter),
restrict the relaxation to that subset, or materialize/checkpoint the source so
both scans agree.
Nit (separate, minor): the rest of this file writes `operation.command`
without parens (lines 421, 494, 525) — worth matching:
```suggestion
override def allowNonDeterministicExpression: Boolean = operation.command
== MERGE
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -455,7 +458,10 @@ case class WriteDelta(
originalTable: NamedRelation,
projections: WriteDeltaProjections,
groupFilterCondition: Option[Expression] = None,
- write: Option[DeltaWrite] = None) extends RowLevelWrite {
+ write: Option[DeltaWrite] = None)
+ extends RowLevelWrite with SupportsNonDeterministicExpression {
+
+ override def allowNonDeterministicExpression: Boolean = operation.command()
== MERGE
Review Comment:
Same as the `ReplaceData` override above: the delta-based path feeds
`groupFilterCondition` to the same `RowLevelOperationRuntimeGroupFiltering`
rule (via `DeltaBasedRowLevelOperation`), so the second-source-scan concern
applies here too. (Posting here as well so the `WriteDelta` side isn't missed.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]