andreaschat-db commented on code in PR #56121:
URL: https://github.com/apache/spark/pull/56121#discussion_r3355493723
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java:
##########
@@ -66,6 +68,29 @@ public interface Transaction extends Closeable {
*/
void abort();
+ /**
+ * Attempts to register a list of materialized scans against this
transaction's read set.
+ * <p>
+ * Spark calls this when considering reuse of a cached subtree during the
transaction. The
Review Comment:
Generalised description.
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java:
##########
@@ -66,6 +68,29 @@ public interface Transaction extends Closeable {
*/
void abort();
+ /**
+ * Attempts to register a list of materialized scans against this
transaction's read set.
+ * <p>
+ * Spark calls this when considering reuse of a cached subtree during the
transaction. The
+ * list contains every materialized scan in the candidate cached subtree.
That may include
+ * scans that belong to catalogs other than this transaction's catalog. The
connector can
+ * decide which scans to register or ignore.
+ * <p>
+ * The connector decides whether reusing the cached snapshots is compatible
with the
+ * transaction's isolation contract. It must either accept the cache entry
(returning
+ * {@code true} after adding any of its own scans to the read set) or refuse
it (returning
+ * {@code false} without modifying the read set).
+ * <p>
+ * This method may be called multiple times during a single query with
overlapping scan
+ * lists. Registering a scan that is already in the read set must be a no-op.
+ *
+ * @param scans Every materialized scan in the candidate cached subtree.
+ * @return true if the connector accepts reuse of the cache entry; false
otherwise.
+ */
+ default boolean registerScans(List<Scan> scans) {
Review Comment:
Makes sense. Removed default.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala:
##########
@@ -122,6 +123,21 @@ case class DataSourceV2Relation(
copy(output = output.map(_.newInstance()))
}
+ // Replace the `table` field with an identity placeholder. This allows two
relations
+ // that point at the same logical table compare equal even when one is
wrapped (e.g.
+ // by a transaction-aware catalog). Note, when `Table.id()` is null we key
only on catalog
+ // and identifier, thus, drop-and-recreate is not distinguished.
+ override def doCanonicalize(): LogicalPlan = {
Review Comment:
Arrrg this is what I was looking for. Makes sense. Dropped this.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala:
##########
@@ -479,8 +481,13 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
lookupCachedDataInternal(normalized)
}
- private def lookupCachedDataInternal(plan: LogicalPlan): Option[CachedData]
= {
- val result = cachedData.find(cd => plan.sameResult(cd.plan))
+ private def lookupCachedDataInternal(
Review Comment:
Passing a filter function was my original version :D. I was thinking that I
might be over-engineering it. But why not.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala:
##########
@@ -489,16 +496,34 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
result
}
+ // Decides whether the cached entry can be substituted into a plan being
executed inside
+ // the given transaction. It identifies the scans and attempts to register
them in the connector.
+ // The connector returns true if reusing the cached snapshot is consistent
with its isolation
+ // contract. Note, the scans might belong to different catalogs. The
connector can decide which
+ // one to register and which ones to ignore.
+ private def validateCachedEntryForTransaction(cd: CachedData, txn:
Transaction): Boolean = {
Review Comment:
Fixed. I explained in a relevant comment in `Transaction.java` what my issue
was. I can now get the catalog from the logical plan and match this against the
scan.table found in the materialized plan by relying on `table.equals`.
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java:
##########
@@ -66,6 +68,29 @@ public interface Transaction extends Closeable {
*/
void abort();
+ /**
+ * Attempts to register a list of materialized scans against this
transaction's read set.
+ * <p>
+ * Spark calls this when considering reuse of a cached subtree during the
transaction. The
+ * list contains every materialized scan in the candidate cached subtree.
That may include
+ * scans that belong to catalogs other than this transaction's catalog. The
connector can
+ * decide which scans to register or ignore.
+ * <p>
+ * The connector decides whether reusing the cached snapshots is compatible
with the
Review Comment:
Trimmed the description.
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java:
##########
@@ -66,6 +68,29 @@ public interface Transaction extends Closeable {
*/
void abort();
+ /**
+ * Attempts to register a list of materialized scans against this
transaction's read set.
+ * <p>
+ * Spark calls this when considering reuse of a cached subtree during the
transaction. The
+ * list contains every materialized scan in the candidate cached subtree.
That may include
+ * scans that belong to catalogs other than this transaction's catalog. The
connector can
Review Comment:
We can have scans from multiple catalogs in a single command. We only
support predicate tracking at the transaction catalog. My trouble here was
identifying In Spark which scan belongs to which catalog. So I was delegating
that to the connector. I was missing that the connector overrides
`Table.equals`. Now this can work. I am passing the filtered scan list.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]