gengliangwang commented on code in PR #56121:
URL: https://github.com/apache/spark/pull/56121#discussion_r3364612171
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala:
##########
@@ -1233,4 +1233,318 @@ class MergeIntoDataFrameSuite extends
RowLevelOperationSuiteBase {
assert(e.message.contains("incompatible changes to table
`cat`.`ns1`.`source_table`"))
}
}
+
+ // Cache-substitution tests for the txn path: connector approves stale-free
cached scans via
+ // Transaction.registerScans.
+ test("cached merge source is reused when the table is unchanged since
caching") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |""".stripMargin)
+
+ val tableVersionBeforeCache = table.version()
+
+ val sourceDF = spark.table(tableNameAsString).where("salary <
250").as("source")
+ sourceDF.cache()
+ sourceDF.count()
+
+ assert(table.version() == tableVersionBeforeCache, "sanity: caching does
not bump version")
+
+ val (txn, txnTables) = executeTransaction {
+ sourceDF
+ .mergeInto(tableNameAsString, $"source.pk" === targetTableCol("pk"))
+ .whenMatched()
+ .update(Map("salary" -> targetTableCol("salary").plus(1)))
+ .merge()
+ }
+
+ assert(txn.currentState == Committed)
+ assert(txn.isClosed)
+ assert(txnTables.size == 1)
+ assert(table.version() == "2")
+ assert(txn.registeredScans.nonEmpty, "registerScans should have accepted
the cached scan")
+
+ val targetTxnTable = txnTables(tableNameAsString)
+ assert(targetTxnTable.scanEvents.size == 4)
+ val sourceFilterScans = targetTxnTable.scanEvents.flatten.count {
+ case sources.LessThan("salary", 250) => true
+ case _ => false
+ }
+ assert(sourceFilterScans == 2,
+ s"expected two salary < 250 scan events, got " +
+ s"${targetTxnTable.scanEvents.map(_.toSeq).mkString("[", ", ", "]")}")
+ val targetScans = targetTxnTable.scanEvents.count(_.isEmpty)
+ assert(targetScans == 2,
+ s"expected two target-side scans with no pushed filters, got " +
+ s"${targetTxnTable.scanEvents.map(_.toSeq).mkString("[", ", ", "]")}")
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 101, "hr"),
+ Row(2, 201, "software"),
+ Row(3, 300, "hr")))
+ }
+
+ test("cached merge source is dropped when the table version moves on between
caching and MERGE") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |""".stripMargin)
+
+ val sourceDF = spark.table(tableNameAsString).where("salary <
250").as("source")
+ sourceDF.cache()
+ sourceDF.count()
+ val versionAtCache = table.version()
+
+ // Bump the version directly to simulate an out-of-band committer. A
Spark-side write would
+ // also bump the version but would trigger CacheManager.refreshCache and
defeat the test.
+ table.increaseVersion()
+ assert(table.version() != versionAtCache, "sanity: bump should change the
version")
+
+ val (txn, txnTables) = executeTransaction {
+ sourceDF
+ .mergeInto(tableNameAsString, $"source.pk" === targetTableCol("pk"))
+ .whenMatched()
+ .update(Map("salary" -> targetTableCol("salary").plus(1)))
+ .merge()
+ }
+
+ assert(txn.currentState == Committed)
+ assert(txn.isClosed)
+ assert(txnTables.size == 1)
+ assert(table.version() == "3")
+ assert(txn.registeredScans.isEmpty, "registerScans should refuse the stale
cached scan")
Review Comment:
This message misattributes the mechanism: here the stale entry is rejected
by the version-aware `sameResult`/`Table.equals` check *before* `registerScans`
is consulted — exactly as your own comments state (`txns.scala`: "Staleness is
handled upstream"; `InMemoryRowLevelOperationTableCatalog`: "compare unequal so
cache substitution fails before `Transaction.registerScans` is consulted"). So
`registerScans` is never called in this test; the "connector rejecting
registerScans" test is the one that exercises the refuse path. Suggest wording
that points at the version mismatch, e.g.:
```suggestion
assert(txn.registeredScans.isEmpty, "stale entry fails version-aware
sameResult, so registerScans is never consulted")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]