cloud-fan commented on code in PR #55858: URL: https://github.com/apache/spark/pull/55858#discussion_r3469949360
########## sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedMergeNondeterministicSuite.scala: ########## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.functions.udf +import org.apache.spark.sql.internal.SQLConf + +/** + * Tests for SPARK-56729: non-deterministic MERGE source with runtime group filtering. + * + * Verifies that the determinism guard in RowLevelOperationRuntimeGroupFiltering + * prevents lost updates when the source contains non-deterministic expressions. + */ +object GroupBasedMergeNondeterministicSuite { + val evalCounter = new AtomicInteger(0) +} + +class GroupBasedMergeNondeterministicSuite extends MergeIntoTableSuiteBase { Review Comment: This suite extends `MergeIntoTableSuiteBase`, which is group-based (`deltaMerge = false`), so all four end-to-end tests here exercise only the `ReplaceData` (copy-on-write) path. The `WriteDelta` (merge-on-read) path — which round 1 flagged separately — is verified only by the getter assertion in `AnalysisErrorSuite`, yet the same `canInjectGroupFilters` guard is what prevents the lost update there too. Consider a delta-based variant (mirroring these tests in a suite where `deltaMerge = true`) so merge-on-read gets the same reproduce-then-verify coverage. ########## sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedMergeNondeterministicSuite.scala: ########## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.functions.udf +import org.apache.spark.sql.internal.SQLConf + +/** + * Tests for SPARK-56729: non-deterministic MERGE source with runtime group filtering. + * + * Verifies that the determinism guard in RowLevelOperationRuntimeGroupFiltering + * prevents lost updates when the source contains non-deterministic expressions. + */ +object GroupBasedMergeNondeterministicSuite { + val evalCounter = new AtomicInteger(0) +} + +class GroupBasedMergeNondeterministicSuite extends MergeIntoTableSuiteBase { + + import GroupBasedMergeNondeterministicSuite.evalCounter + + test("SPARK-56729: non-deterministic source skips group filter and updates correctly") { + // A non-deterministic UDF that always passes marks the source as non-deterministic. + // The determinism guard skips the group filter, so the source is scanned only once + // and both rows are correctly updated. Without the guard, the group filter would + // create a second independent scan of the source, risking lost updates. + withSQLConf( + SQLConf.RUNTIME_ROW_LEVEL_OPERATION_GROUP_FILTER_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false" + ) { + withTempView("source") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |""".stripMargin) + + evalCounter.set(0) + + spark.udf.register("nondet_pass", + udf((pk: Int) => { + GroupBasedMergeNondeterministicSuite.evalCounter.incrementAndGet() + true + }).asNondeterministic()) + + sql( + """CREATE OR REPLACE TEMP VIEW source AS + |SELECT pk FROM VALUES (1), (2) t(pk) WHERE nondet_pass(pk) + |""".stripMargin) + + val executedPlan = executeAndKeepPlan { + sql( + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED THEN + | UPDATE SET t.salary = t.salary + 1 + |""".stripMargin) + } + + // Both rows updated correctly (no lost update due to partition pruning) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString ORDER BY pk"), + Seq(Row(1, 101, "hr"), Row(2, 201, "software"))) + + // Group filter was NOT injected (guard skipped it) + val dynamicFilters = collect(executedPlan) { + case b: BatchScanExec if b.runtimeFilters.exists( + _.isInstanceOf[DynamicPruningExpression]) => b + } + assert(dynamicFilters.isEmpty, + "Group filter should NOT be injected for non-deterministic source") + + // Source scanned only once (2 rows = 2 UDF calls), not twice + assert(evalCounter.get() == 2, + s"Expected 2 UDF calls (single scan), got ${evalCounter.get()}") + } + } + } + + test("SPARK-56729: non-deterministic source MERGE passes analysis and produces correct results") { + // End-to-end: a MERGE with rand() in the source (non-deterministic) previously failed with + // INVALID_NON_DETERMINISTIC_EXPRESSIONS. Now it passes analysis AND produces correct results + // with runtime group filter enabled. + withSQLConf( + SQLConf.RUNTIME_ROW_LEVEL_OPERATION_GROUP_FILTER_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false" + ) { + withTempView("source") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |""".stripMargin) + + sql( + """CREATE OR REPLACE TEMP VIEW source AS + |SELECT pk, salary FROM VALUES (1, 150), (2, 250) t(pk, salary) + |WHERE rand() < 2.0 + |""".stripMargin) + + sql( + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED THEN + | UPDATE SET t.salary = s.salary + |""".stripMargin) + + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString ORDER BY pk"), + Seq(Row(1, 150, "hr"), Row(2, 250, "software"))) + } + } + } + + test("SPARK-56729: deterministic source MERGE still uses runtime group filter") { + // A deterministic source should still benefit from the group filter optimization. + withSQLConf( + SQLConf.RUNTIME_ROW_LEVEL_OPERATION_GROUP_FILTER_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false" + ) { + withTempView("source") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |""".stripMargin) + + sql( + """CREATE OR REPLACE TEMP VIEW source AS + |SELECT pk, salary FROM VALUES (1, 150) t(pk, salary) + |""".stripMargin) + + val executedPlan = executeAndKeepPlan { + sql( + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED THEN + | UPDATE SET t.salary = s.salary + |""".stripMargin) + } + + // Results are correct + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString ORDER BY pk"), + Seq(Row(1, 150, "hr"), Row(2, 200, "software"))) + + // Dynamic pruning was applied (group filter injected for deterministic source) + val dynamicFilters = collect(executedPlan) { + case b: BatchScanExec if b.runtimeFilters.exists( + _.isInstanceOf[DynamicPruningExpression]) => b + } + assert(dynamicFilters.nonEmpty, + "Group filter should be injected for deterministic source MERGE") + } + } + } + + test("SPARK-56729: non-deterministic source does not inject runtime group filter") { Review Comment: This test and the `rand() < 2.0` test above ("...produces correct results") use the same source and MERGE; one asserts the result rows, the other asserts no group filter is injected. The UDF-counter test at the top already asserts both — plus single-scan — together, so these two are largely redundant. Consider folding them into one `rand()`-based test that checks both the rows and the absence of the group filter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
