gengliangwang commented on code in PR #56121:
URL: https://github.com/apache/spark/pull/56121#discussion_r3364612171


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala:
##########
@@ -1233,4 +1233,318 @@ class MergeIntoDataFrameSuite extends 
RowLevelOperationSuiteBase {
       assert(e.message.contains("incompatible changes to table 
`cat`.`ns1`.`source_table`"))
     }
   }
+
+  // Cache-substitution tests for the txn path: connector approves stale-free 
cached scans via
+  // Transaction.registerScans.
+  test("cached merge source is reused when the table is unchanged since 
caching") {
+    createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+      """{ "pk": 1, "salary": 100, "dep": "hr" }
+        |{ "pk": 2, "salary": 200, "dep": "software" }
+        |{ "pk": 3, "salary": 300, "dep": "hr" }
+        |""".stripMargin)
+
+    val tableVersionBeforeCache = table.version()
+
+    val sourceDF = spark.table(tableNameAsString).where("salary < 
250").as("source")
+    sourceDF.cache()
+    sourceDF.count()
+
+    assert(table.version() == tableVersionBeforeCache, "sanity: caching does 
not bump version")
+
+    val (txn, txnTables) = executeTransaction {
+      sourceDF
+        .mergeInto(tableNameAsString, $"source.pk" === targetTableCol("pk"))
+        .whenMatched()
+        .update(Map("salary" -> targetTableCol("salary").plus(1)))
+        .merge()
+    }
+
+    assert(txn.currentState == Committed)
+    assert(txn.isClosed)
+    assert(txnTables.size == 1)
+    assert(table.version() == "2")
+    assert(txn.registeredScans.nonEmpty, "registerScans should have accepted 
the cached scan")
+
+    val targetTxnTable = txnTables(tableNameAsString)
+    assert(targetTxnTable.scanEvents.size == 4)
+    val sourceFilterScans = targetTxnTable.scanEvents.flatten.count {
+      case sources.LessThan("salary", 250) => true
+      case _ => false
+    }
+    assert(sourceFilterScans == 2,
+      s"expected two salary < 250 scan events, got " +
+        s"${targetTxnTable.scanEvents.map(_.toSeq).mkString("[", ", ", "]")}")
+    val targetScans = targetTxnTable.scanEvents.count(_.isEmpty)
+    assert(targetScans == 2,
+      s"expected two target-side scans with no pushed filters, got " +
+        s"${targetTxnTable.scanEvents.map(_.toSeq).mkString("[", ", ", "]")}")
+
+    checkAnswer(
+      sql(s"SELECT * FROM $tableNameAsString"),
+      Seq(
+        Row(1, 101, "hr"),
+        Row(2, 201, "software"),
+        Row(3, 300, "hr")))
+  }
+
+  test("cached merge source is dropped when the table version moves on between 
caching and MERGE") {
+    createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+      """{ "pk": 1, "salary": 100, "dep": "hr" }
+        |{ "pk": 2, "salary": 200, "dep": "software" }
+        |{ "pk": 3, "salary": 300, "dep": "hr" }
+        |""".stripMargin)
+
+    val sourceDF = spark.table(tableNameAsString).where("salary < 
250").as("source")
+    sourceDF.cache()
+    sourceDF.count()
+    val versionAtCache = table.version()
+
+    // Bump the version directly to simulate an out-of-band committer. A 
Spark-side write would
+    // also bump the version but would trigger CacheManager.refreshCache and 
defeat the test.
+    table.increaseVersion()
+    assert(table.version() != versionAtCache, "sanity: bump should change the 
version")
+
+    val (txn, txnTables) = executeTransaction {
+      sourceDF
+        .mergeInto(tableNameAsString, $"source.pk" === targetTableCol("pk"))
+        .whenMatched()
+        .update(Map("salary" -> targetTableCol("salary").plus(1)))
+        .merge()
+    }
+
+    assert(txn.currentState == Committed)
+    assert(txn.isClosed)
+    assert(txnTables.size == 1)
+    assert(table.version() == "3")
+    assert(txn.registeredScans.isEmpty, "registerScans should refuse the stale 
cached scan")

Review Comment:
   This message misattributes the mechanism: here the stale entry is rejected 
by the version-aware `sameResult`/`Table.equals` check *before* `registerScans` 
is consulted — exactly as your own comments state (`txns.scala`: "Staleness is 
handled upstream"; `InMemoryRowLevelOperationTableCatalog`: "compare unequal so 
cache substitution fails before `Transaction.registerScans` is consulted"). So 
`registerScans` is never called in this test; the "connector rejecting 
registerScans" test is the one that exercises the refuse path. Suggest wording 
that points at the version mismatch, e.g.:
   ```suggestion
       assert(txn.registeredScans.isEmpty, "stale entry fails version-aware 
sameResult, so registerScans is never consulted")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to