gengliangwang commented on code in PR #56121:
URL: https://github.com/apache/spark/pull/56121#discussion_r3364612158
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala:
##########
@@ -489,16 +492,37 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
result
}
+ // Decides whether the cached entry can be substituted into a plan being
executed inside
+ // the given transaction. Collects only the scans whose table belongs to the
transaction's
+ // catalog and asks the connector whether reusing the cached snapshot is
compatible with its
+ // isolation contract.
+ private def validateCachedEntryForTransaction(cd: CachedData, txn:
Transaction): Boolean = {
+ val txnCatalogName = txn.catalog().name()
+ val txnTables =
cd.cachedRepresentation.cacheBuilder.logicalPlan.collectWithSubqueries {
+ case r: DataSourceV2Relation if r.catalog.exists(_.name() ==
txnCatalogName) => r.table
+ }.toSet
+ val scans =
collectWithSubqueries(cd.cachedRepresentation.cacheBuilder.cachedPlan) {
+ case b: BatchScanExec if txnTables.contains(b.table) => b.scan
+ }
+ scans.isEmpty || txn.registerScans(scans.toArray)
Review Comment:
`scans.isEmpty` is the accept condition for two different situations, and
only one is safe by construction. `scans` is empty when (a) the entry has no
relations from the txn catalog — fine, reuse it; but also when (b) the entry
*does* contain txn-catalog relations (`txnTables.nonEmpty`) whose physical
scans aren't `BatchScanExec` — e.g. a nested cached relation served as
`InMemoryTableScanExec`, a v1-fallback scan, or a streaming scan. In case (b)
the entry has already passed the version-aware `sameResult` check, but
`registerScans` is silently skipped, so a connector whose isolation contract
rejects reuse for reasons beyond version equality never gets to veto.
Is case (b) reachable for the target connectors? If so, consider
distinguishing the two — accept on `txnTables.isEmpty`, but treat
`txnTables.nonEmpty && scans.isEmpty` as "can't prove compatibility" (refuse,
or at least assert it doesn't happen) rather than folding both into one
`scans.isEmpty`.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala:
##########
@@ -489,16 +492,37 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
result
}
+ // Decides whether the cached entry can be substituted into a plan being
executed inside
+ // the given transaction. Collects only the scans whose table belongs to the
transaction's
+ // catalog and asks the connector whether reusing the cached snapshot is
compatible with its
+ // isolation contract.
+ private def validateCachedEntryForTransaction(cd: CachedData, txn:
Transaction): Boolean = {
Review Comment:
`validateCachedEntryForTransaction` (and the `canUse` predicate it becomes —
documented as "predicate filtering which cached entries are eligible") read as
pure, but they mutate the connector's read set via `txn.registerScans`, and
that runs inside `lookupCachedDataInternal`'s `find`. It's correct today —
registration aligns 1:1 with the entry that gets substituted — but the side
effect is load-bearing for the transaction's read tracking and invisible at the
call site. A short doc note on `canUse` (or a name that signals the effect,
e.g. `registerAndCheck`) would keep a future change to
`lookupCachedDataInternal` (memoizing the predicate, reordering, calling it for
diagnostics) from silently breaking read tracking.
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala:
##########
@@ -1233,4 +1233,318 @@ class MergeIntoDataFrameSuite extends
RowLevelOperationSuiteBase {
assert(e.message.contains("incompatible changes to table
`cat`.`ns1`.`source_table`"))
}
}
+
+ // Cache-substitution tests for the txn path: connector approves stale-free
cached scans via
+ // Transaction.registerScans.
+ test("cached merge source is reused when the table is unchanged since
caching") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |""".stripMargin)
+
+ val tableVersionBeforeCache = table.version()
+
+ val sourceDF = spark.table(tableNameAsString).where("salary <
250").as("source")
+ sourceDF.cache()
+ sourceDF.count()
+
+ assert(table.version() == tableVersionBeforeCache, "sanity: caching does
not bump version")
+
+ val (txn, txnTables) = executeTransaction {
+ sourceDF
+ .mergeInto(tableNameAsString, $"source.pk" === targetTableCol("pk"))
+ .whenMatched()
+ .update(Map("salary" -> targetTableCol("salary").plus(1)))
+ .merge()
+ }
+
+ assert(txn.currentState == Committed)
+ assert(txn.isClosed)
+ assert(txnTables.size == 1)
+ assert(table.version() == "2")
+ assert(txn.registeredScans.nonEmpty, "registerScans should have accepted
the cached scan")
+
+ val targetTxnTable = txnTables(tableNameAsString)
+ assert(targetTxnTable.scanEvents.size == 4)
+ val sourceFilterScans = targetTxnTable.scanEvents.flatten.count {
+ case sources.LessThan("salary", 250) => true
+ case _ => false
+ }
+ assert(sourceFilterScans == 2,
+ s"expected two salary < 250 scan events, got " +
+ s"${targetTxnTable.scanEvents.map(_.toSeq).mkString("[", ", ", "]")}")
+ val targetScans = targetTxnTable.scanEvents.count(_.isEmpty)
+ assert(targetScans == 2,
+ s"expected two target-side scans with no pushed filters, got " +
+ s"${targetTxnTable.scanEvents.map(_.toSeq).mkString("[", ", ", "]")}")
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 101, "hr"),
+ Row(2, 201, "software"),
+ Row(3, 300, "hr")))
+ }
+
+ test("cached merge source is dropped when the table version moves on between
caching and MERGE") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |""".stripMargin)
+
+ val sourceDF = spark.table(tableNameAsString).where("salary <
250").as("source")
+ sourceDF.cache()
+ sourceDF.count()
+ val versionAtCache = table.version()
+
+ // Bump the version directly to simulate an out-of-band committer. A
Spark-side write would
+ // also bump the version but would trigger CacheManager.refreshCache and
defeat the test.
+ table.increaseVersion()
+ assert(table.version() != versionAtCache, "sanity: bump should change the
version")
+
+ val (txn, txnTables) = executeTransaction {
+ sourceDF
+ .mergeInto(tableNameAsString, $"source.pk" === targetTableCol("pk"))
+ .whenMatched()
+ .update(Map("salary" -> targetTableCol("salary").plus(1)))
+ .merge()
+ }
+
+ assert(txn.currentState == Committed)
+ assert(txn.isClosed)
+ assert(txnTables.size == 1)
+ assert(table.version() == "3")
+ assert(txn.registeredScans.isEmpty, "registerScans should refuse the stale
cached scan")
Review Comment:
This message misattributes the mechanism: here the stale entry is rejected
by the version-aware `sameResult`/`Table.equals` check *before* `registerScans`
is consulted — exactly as your own comments state (`txns.scala`: "Staleness is
handled upstream"; `InMemoryRowLevelOperationTableCatalog`: "compare unequal so
cache substitution fails before `Transaction.registerScans` is consulted"). So
`registerScans` is never called in this test; the "connector rejecting
registerScans" test is the one that exercises the refuse path. Suggest wording
that points at the version mismatch, e.g.:
```suggestion
assert(txn.registeredScans.isEmpty, "stale entry fails version-aware
sameResult, so registerScans is never consulted")
```
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -116,6 +116,21 @@ abstract class InMemoryBaseTable(
}
}
+ // Version-aware equality: two tables refer to the same metastore entity at
the same state.
+ // Fallback to reference equality when `id()` is null (no metastore
identity).
Review Comment:
"Fallback" is a noun; as a verb it's two words.
```suggestion
// Fall back to reference equality when `id()` is null (no metastore
identity).
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2548,6 +2548,15 @@ class Analyzer(
case a: FunctionTableSubqueryArgumentExpression if !a.plan.resolved =>
resolveSubQuery(a, outer)(
(plan, outerAttrs) => a.copy(plan = plan, outerAttrs = outerAttrs))
+ // The subquery's plan is already resolved. Replace any
V2TableReferences without
+ // re-running any analyzer rules.
+ case se: SubqueryExpression
+ if se.plan.resolved &&
+ se.plan.collectFirstWithSubqueries { case _: V2TableReference
=> () }.isDefined =>
+ val newPlan = se.plan.transformWithSubqueries {
+ case r: V2TableReference => relationResolution.resolveReference(r)
Review Comment:
Re: the question above on whether this uses the same relation cache as the
main resolution rule — yes. This case and the top-level `case r:
V2TableReference` (Analyzer.scala:1131) both call
`relationResolution.resolveReference`, which routes through
`getOrLoadRelation`/`relationCache` (RelationResolution.scala:460-480).
`relationCache` is `AnalysisContext.get.relationCache`, which is per-analysis,
so within one query the relation is loaded once — the first occurrence
(top-level or subquery) goes through the txn catalog and tracks the read, and
later occurrences reuse a consistent snapshot. Across queries the cache is
fresh. So the sharing looks correct and is actually what keeps a
self-referencing table consistent within the query.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]