cloud-fan commented on code in PR #55858:
URL: https://github.com/apache/spark/pull/55858#discussion_r3462993872


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala:
##########
@@ -1345,4 +1345,64 @@ class AnalysisErrorSuite extends AnalysisTest with 
DataTypeErrorsBase {
       "INVALID_NON_DETERMINISTIC_EXPRESSIONS" :: Nil
     )
   }
+
+  test("SPARK-56729: ReplaceData and WriteDelta allow non-deterministic 
expressions") {

Review Comment:
   This test only reads the boolean getter — it builds the nodes with a 
deterministic `Literal(true)` condition and a never-resolved `dummyQuery`, then 
asserts `allowNonDeterministicExpression`. It never constructs a 
non-deterministic expression, never runs `CheckAnalysis`, and never exercises 
the runtime group-filter or an end-to-end MERGE. So it proves neither the fix 
(analysis now passes for a non-deterministic-source MERGE) nor that execution 
is correct (the group-filter concern in the `v2Commands.scala:373` thread).
   
   An end-to-end test would be much stronger: a `MERGE INTO … USING 
(<non-deterministic source>) …` over an `InMemoryRowLevelOperationTable` that 
(a) reproduces the original `INVALID_NON_DETERMINISTIC_EXPRESSIONS` failure 
this PR fixes, and (b) runs with `runtimeRowLevelOperationGroupFilterEnabled` 
and checks the result rows are correct.
   
   Two smaller points: the negative case covers DELETE but not UPDATE (also 
returns `false`); and `AnalysisErrorSuite` exists to assert analysis *errors*, 
so a getter-only no-error test is an odd fit — a row-level-operation suite 
would be a more natural home.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -367,7 +367,10 @@ case class ReplaceData(
     originalTable: NamedRelation,
     projections: ReplaceDataProjections,
     groupFilterCondition: Option[Expression] = None,
-    write: Option[Write] = None) extends RowLevelWrite {
+    write: Option[Write] = None)
+    extends RowLevelWrite with SupportsNonDeterministicExpression {
+
+  override def allowNonDeterministicExpression: Boolean = operation.command() 
== MERGE

Review Comment:
   **Blocking (correctness — needs validation).** Implementing 
`SupportsNonDeterministicExpression` only suppresses the `CheckAnalysis` error; 
it doesn't make the operator's *execution* correct with a non-deterministic 
expression. The common safe approach is to pull non-deterministic exprs into an 
underlying `Project` (evaluated once) — which is exactly what the UPDATE 
rewrite does for its assignments, and why UPDATE needs no trait. This PR 
instead lets the node carry the expression, so execution has to be validated.
   
   The only expression this relaxation can permit on a MERGE node is 
`groupFilterCondition` (the `condition` is validated deterministic by 
`RewriteMergeIntoTable.checkMergeIntoCondition`). And `groupFilterCondition` is 
consumed by `RowLevelOperationRuntimeGroupFiltering`, which builds a **separate 
dynamic-pruning subquery that re-scans the MERGE source** 
(`buildMatchingRowsPlan` → `Filter(cond, relation)` → `Aggregate(buildKeys, 
…)`) to decide which target groups the main merge reads — with no 
`.deterministic` guard (`canInjectGroupFilters` checks only `cond != 
TrueLiteral` and `filterAttributes.nonEmpty`). Non-deterministic plans aren't 
reuse-deduplicated, so the source is evaluated **twice independently**: once in 
the main merge join, once in the pruning subquery.
   
   - Safe case: non-determinism confined to non-key source columns — the 
pruning `Aggregate` projects only the join keys, so the pruned set doesn't 
depend on it. The old conservative rejection was a false positive here.
   - Unsafe case: non-determinism that affects which rows/keys the source 
produces, e.g. `MERGE INTO t USING (SELECT … FROM s WHERE rand() < 0.5) src ON 
t.id = src.id …`. The pruning scan and the main merge scan evaluate `rand()` 
independently; a key the main merge would match but the pruning scan drops → 
that target group is pruned (not read) → the update is silently lost.
   
   The gate is unconditional for all of MERGE, so it admits the unsafe case. 
Could you validate this path? Options: prove the double-evaluation can't change 
results (e.g. only deterministic key columns ever drive the group filter), 
restrict the relaxation to that subset, or materialize/checkpoint the source so 
both scans agree.
   
   Nit (separate, minor): the rest of this file writes `operation.command` 
without parens (lines 421, 494, 525) — worth matching:
   ```suggestion
     override def allowNonDeterministicExpression: Boolean = operation.command 
== MERGE
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -455,7 +458,10 @@ case class WriteDelta(
     originalTable: NamedRelation,
     projections: WriteDeltaProjections,
     groupFilterCondition: Option[Expression] = None,
-    write: Option[DeltaWrite] = None) extends RowLevelWrite {
+    write: Option[DeltaWrite] = None)
+    extends RowLevelWrite with SupportsNonDeterministicExpression {
+
+  override def allowNonDeterministicExpression: Boolean = operation.command() 
== MERGE

Review Comment:
   Same as the `ReplaceData` override above: the delta-based path feeds 
`groupFilterCondition` to the same `RowLevelOperationRuntimeGroupFiltering` 
rule (via `DeltaBasedRowLevelOperation`), so the second-source-scan concern 
applies here too. (Posting here as well so the `WriteDelta` side isn't missed.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to