gengliangwang commented on code in PR #56121:
URL: https://github.com/apache/spark/pull/56121#discussion_r3364612158


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala:
##########
@@ -489,16 +492,37 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
     result
   }
 
+  // Decides whether the cached entry can be substituted into a plan being 
executed inside
+  // the given transaction. Collects only the scans whose table belongs to the 
transaction's
+  // catalog and asks the connector whether reusing the cached snapshot is 
compatible with its
+  // isolation contract.
+  private def validateCachedEntryForTransaction(cd: CachedData, txn: 
Transaction): Boolean = {
+    val txnCatalogName = txn.catalog().name()
+    val txnTables = 
cd.cachedRepresentation.cacheBuilder.logicalPlan.collectWithSubqueries {
+      case r: DataSourceV2Relation if r.catalog.exists(_.name() == 
txnCatalogName) => r.table
+    }.toSet
+    val scans = 
collectWithSubqueries(cd.cachedRepresentation.cacheBuilder.cachedPlan) {
+      case b: BatchScanExec if txnTables.contains(b.table) => b.scan
+    }
+    scans.isEmpty || txn.registerScans(scans.toArray)

Review Comment:
   `scans.isEmpty` is the accept condition for two different situations, and 
only one is safe by construction. `scans` is empty when (a) the entry has no 
relations from the txn catalog — fine, reuse it; but also when (b) the entry 
*does* contain txn-catalog relations (`txnTables.nonEmpty`) whose physical 
scans aren't `BatchScanExec` — e.g. a nested cached relation served as 
`InMemoryTableScanExec`, a v1-fallback scan, or a streaming scan. In case (b) 
the entry has already passed the version-aware `sameResult` check, but 
`registerScans` is silently skipped, so a connector whose isolation contract 
rejects reuse for reasons beyond version equality never gets to veto.
   
   Is case (b) reachable for the target connectors? If so, consider 
distinguishing the two — accept on `txnTables.isEmpty`, but treat 
`txnTables.nonEmpty && scans.isEmpty` as "can't prove compatibility" (refuse, 
or at least assert it doesn't happen) rather than folding both into one 
`scans.isEmpty`.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala:
##########
@@ -489,16 +492,37 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
     result
   }
 
+  // Decides whether the cached entry can be substituted into a plan being 
executed inside
+  // the given transaction. Collects only the scans whose table belongs to the 
transaction's
+  // catalog and asks the connector whether reusing the cached snapshot is 
compatible with its
+  // isolation contract.
+  private def validateCachedEntryForTransaction(cd: CachedData, txn: 
Transaction): Boolean = {

Review Comment:
   `validateCachedEntryForTransaction` (and the `canUse` predicate it becomes — 
documented as "predicate filtering which cached entries are eligible") read as 
pure, but they mutate the connector's read set via `txn.registerScans`, and 
that runs inside `lookupCachedDataInternal`'s `find`. It's correct today — 
registration aligns 1:1 with the entry that gets substituted — but the side 
effect is load-bearing for the transaction's read tracking and invisible at the 
call site. A short doc note on `canUse` (or a name that signals the effect, 
e.g. `registerAndCheck`) would keep a future change to 
`lookupCachedDataInternal` (memoizing the predicate, reordering, calling it for 
diagnostics) from silently breaking read tracking.



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala:
##########
@@ -1233,4 +1233,318 @@ class MergeIntoDataFrameSuite extends 
RowLevelOperationSuiteBase {
       assert(e.message.contains("incompatible changes to table 
`cat`.`ns1`.`source_table`"))
     }
   }
+
+  // Cache-substitution tests for the txn path: connector approves stale-free 
cached scans via
+  // Transaction.registerScans.
+  test("cached merge source is reused when the table is unchanged since 
caching") {
+    createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+      """{ "pk": 1, "salary": 100, "dep": "hr" }
+        |{ "pk": 2, "salary": 200, "dep": "software" }
+        |{ "pk": 3, "salary": 300, "dep": "hr" }
+        |""".stripMargin)
+
+    val tableVersionBeforeCache = table.version()
+
+    val sourceDF = spark.table(tableNameAsString).where("salary < 
250").as("source")
+    sourceDF.cache()
+    sourceDF.count()
+
+    assert(table.version() == tableVersionBeforeCache, "sanity: caching does 
not bump version")
+
+    val (txn, txnTables) = executeTransaction {
+      sourceDF
+        .mergeInto(tableNameAsString, $"source.pk" === targetTableCol("pk"))
+        .whenMatched()
+        .update(Map("salary" -> targetTableCol("salary").plus(1)))
+        .merge()
+    }
+
+    assert(txn.currentState == Committed)
+    assert(txn.isClosed)
+    assert(txnTables.size == 1)
+    assert(table.version() == "2")
+    assert(txn.registeredScans.nonEmpty, "registerScans should have accepted 
the cached scan")
+
+    val targetTxnTable = txnTables(tableNameAsString)
+    assert(targetTxnTable.scanEvents.size == 4)
+    val sourceFilterScans = targetTxnTable.scanEvents.flatten.count {
+      case sources.LessThan("salary", 250) => true
+      case _ => false
+    }
+    assert(sourceFilterScans == 2,
+      s"expected two salary < 250 scan events, got " +
+        s"${targetTxnTable.scanEvents.map(_.toSeq).mkString("[", ", ", "]")}")
+    val targetScans = targetTxnTable.scanEvents.count(_.isEmpty)
+    assert(targetScans == 2,
+      s"expected two target-side scans with no pushed filters, got " +
+        s"${targetTxnTable.scanEvents.map(_.toSeq).mkString("[", ", ", "]")}")
+
+    checkAnswer(
+      sql(s"SELECT * FROM $tableNameAsString"),
+      Seq(
+        Row(1, 101, "hr"),
+        Row(2, 201, "software"),
+        Row(3, 300, "hr")))
+  }
+
+  test("cached merge source is dropped when the table version moves on between 
caching and MERGE") {
+    createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+      """{ "pk": 1, "salary": 100, "dep": "hr" }
+        |{ "pk": 2, "salary": 200, "dep": "software" }
+        |{ "pk": 3, "salary": 300, "dep": "hr" }
+        |""".stripMargin)
+
+    val sourceDF = spark.table(tableNameAsString).where("salary < 
250").as("source")
+    sourceDF.cache()
+    sourceDF.count()
+    val versionAtCache = table.version()
+
+    // Bump the version directly to simulate an out-of-band committer. A 
Spark-side write would
+    // also bump the version but would trigger CacheManager.refreshCache and 
defeat the test.
+    table.increaseVersion()
+    assert(table.version() != versionAtCache, "sanity: bump should change the 
version")
+
+    val (txn, txnTables) = executeTransaction {
+      sourceDF
+        .mergeInto(tableNameAsString, $"source.pk" === targetTableCol("pk"))
+        .whenMatched()
+        .update(Map("salary" -> targetTableCol("salary").plus(1)))
+        .merge()
+    }
+
+    assert(txn.currentState == Committed)
+    assert(txn.isClosed)
+    assert(txnTables.size == 1)
+    assert(table.version() == "3")
+    assert(txn.registeredScans.isEmpty, "registerScans should refuse the stale 
cached scan")

Review Comment:
   This message misattributes the mechanism: here the stale entry is rejected 
by the version-aware `sameResult`/`Table.equals` check *before* `registerScans` 
is consulted — exactly as your own comments state (`txns.scala`: "Staleness is 
handled upstream"; `InMemoryRowLevelOperationTableCatalog`: "compare unequal so 
cache substitution fails before `Transaction.registerScans` is consulted"). So 
`registerScans` is never called in this test; the "connector rejecting 
registerScans" test is the one that exercises the refuse path. Suggest wording 
that points at the version mismatch, e.g.:
   ```suggestion
       assert(txn.registeredScans.isEmpty, "stale entry fails version-aware 
sameResult, so registerScans is never consulted")
   ```



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -116,6 +116,21 @@ abstract class InMemoryBaseTable(
     }
   }
 
+  // Version-aware equality: two tables refer to the same metastore entity at 
the same state.
+  // Fallback to reference equality when `id()` is null (no metastore 
identity).

Review Comment:
   "Fallback" is a noun; as a verb it's two words.
   ```suggestion
     // Fall back to reference equality when `id()` is null (no metastore 
identity).
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2548,6 +2548,15 @@ class Analyzer(
         case a: FunctionTableSubqueryArgumentExpression if !a.plan.resolved =>
           resolveSubQuery(a, outer)(
             (plan, outerAttrs) => a.copy(plan = plan, outerAttrs = outerAttrs))
+        // The subquery's plan is already resolved. Replace any 
V2TableReferences without
+        // re-running any analyzer rules.
+        case se: SubqueryExpression
+            if se.plan.resolved &&
+               se.plan.collectFirstWithSubqueries { case _: V2TableReference 
=> () }.isDefined =>
+          val newPlan = se.plan.transformWithSubqueries {
+            case r: V2TableReference => relationResolution.resolveReference(r)

Review Comment:
   Re: the question above on whether this uses the same relation cache as the 
main resolution rule — yes. This case and the top-level `case r: 
V2TableReference` (Analyzer.scala:1131) both call 
`relationResolution.resolveReference`, which routes through 
`getOrLoadRelation`/`relationCache` (RelationResolution.scala:460-480). 
`relationCache` is `AnalysisContext.get.relationCache`, which is per-analysis, 
so within one query the relation is loaded once — the first occurrence 
(top-level or subquery) goes through the txn catalog and tracks the read, and 
later occurrences reuse a consistent snapshot. Across queries the cache is 
fresh. So the sharing looks correct and is actually what keeps a 
self-referencing table consistent within the query.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to