aokolnychyi commented on code in PR #56121:
URL: https://github.com/apache/spark/pull/56121#discussion_r3350507931


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java:
##########
@@ -66,6 +68,29 @@ public interface Transaction extends Closeable {
    */
   void abort();
 
+  /**
+   * Attempts to register a list of materialized scans against this 
transaction's read set.
+   * <p>
+   * Spark calls this when considering reuse of a cached subtree during the 
transaction. The
+   * list contains every materialized scan in the candidate cached subtree. 
That may include
+   * scans that belong to catalogs other than this transaction's catalog. The 
connector can
+   * decide which scans to register or ignore.
+   * <p>
+   * The connector decides whether reusing the cached snapshots is compatible 
with the
+   * transaction's isolation contract. It must either accept the cache entry 
(returning
+   * {@code true} after adding any of its own scans to the read set) or refuse 
it (returning
+   * {@code false} without modifying the read set).
+   * <p>
+   * This method may be called multiple times during a single query with 
overlapping scan
+   * lists. Registering a scan that is already in the read set must be a no-op.
+   *
+   * @param scans Every materialized scan in the candidate cached subtree.
+   * @return true if the connector accepts reuse of the cache entry; false 
otherwise.
+   */
+  default boolean registerScans(List<Scan> scans) {

Review Comment:
   Do we want to provide a default implementation here? The API hasn't been 
released and it may be better to explicitly check what behavior the connector 
prefers.



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java:
##########
@@ -66,6 +68,29 @@ public interface Transaction extends Closeable {
    */
   void abort();
 
+  /**
+   * Attempts to register a list of materialized scans against this 
transaction's read set.
+   * <p>
+   * Spark calls this when considering reuse of a cached subtree during the 
transaction. The
+   * list contains every materialized scan in the candidate cached subtree. 
That may include
+   * scans that belong to catalogs other than this transaction's catalog. The 
connector can
+   * decide which scans to register or ignore.
+   * <p>
+   * The connector decides whether reusing the cached snapshots is compatible 
with the
+   * transaction's isolation contract. It must either accept the cache entry 
(returning
+   * {@code true} after adding any of its own scans to the read set) or refuse 
it (returning
+   * {@code false} without modifying the read set).
+   * <p>
+   * This method may be called multiple times during a single query with 
overlapping scan
+   * lists. Registering a scan that is already in the read set must be a no-op.
+   *
+   * @param scans Every materialized scan in the candidate cached subtree.
+   * @return true if the connector accepts reuse of the cache entry; false 
otherwise.
+   */
+  default boolean registerScans(List<Scan> scans) {

Review Comment:
   Shall we pass scans as an array (Scan[]) or as a list (List<Scan>)? Would an 
array be more consistent with the rest of the APIs in Data Source V2? It is 
your call, @andreaschat-db. I am OK either way.



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java:
##########
@@ -66,6 +68,29 @@ public interface Transaction extends Closeable {
    */
   void abort();
 
+  /**
+   * Attempts to register a list of materialized scans against this 
transaction's read set.
+   * <p>
+   * Spark calls this when considering reuse of a cached subtree during the 
transaction. The

Review Comment:
   Caching is one use case. You may mention it as an example but without too 
much details. I don't want to limit the use of this method as there may be 
valid use cases in the future.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala:
##########
@@ -489,16 +496,34 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
     result
   }
 
+  // Decides whether the cached entry can be substituted into a plan being 
executed inside
+  // the given transaction. It identifies the scans and attempts to register 
them in the connector.
+  // The connector returns true if reusing the cached snapshot is consistent 
with its isolation
+  // contract. Note, the scans might belong to different catalogs. The 
connector can decide which
+  // one to register and which ones to ignore.
+  private def validateCachedEntryForTransaction(cd: CachedData, txn: 
Transaction): Boolean = {

Review Comment:
   I think we need to limit this to only the catalog for which we started the 
transaction.



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java:
##########
@@ -66,6 +68,29 @@ public interface Transaction extends Closeable {
    */
   void abort();
 
+  /**
+   * Attempts to register a list of materialized scans against this 
transaction's read set.
+   * <p>
+   * Spark calls this when considering reuse of a cached subtree during the 
transaction. The
+   * list contains every materialized scan in the candidate cached subtree. 
That may include
+   * scans that belong to catalogs other than this transaction's catalog. The 
connector can

Review Comment:
   Hm, this doesn't seem to make sense. I thought we would only pass scans for 
the current catalog. We don't support transactions across catalogs, don't we?



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java:
##########
@@ -66,6 +68,29 @@ public interface Transaction extends Closeable {
    */
   void abort();
 
+  /**
+   * Attempts to register a list of materialized scans against this 
transaction's read set.
+   * <p>
+   * Spark calls this when considering reuse of a cached subtree during the 
transaction. The
+   * list contains every materialized scan in the candidate cached subtree. 
That may include
+   * scans that belong to catalogs other than this transaction's catalog. The 
connector can
+   * decide which scans to register or ignore.
+   * <p>
+   * The connector decides whether reusing the cached snapshots is compatible 
with the

Review Comment:
   Methods like `Table$equals()` and `Table$hashCode()` must be version-aware 
by definition, meaning we will only have a cache match if the state is 
compatible.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -684,6 +666,18 @@ abstract class InMemoryBaseTable(
       options: CaseInsensitiveStringMap)
     extends BatchScanBaseClass(_data, readSchema, tableSchema) with 
SupportsRuntimeFiltering {
 
+    // Snapshot of the table version when this scan was built.
+    val builtAtTableVersion: Int = InMemoryBaseTable.this.tableVersion
+
+    // The current table version, read fresh on each access.
+    def currentTableVersion: Int = InMemoryBaseTable.this.tableVersion

Review Comment:
   DSv2 versions must be pinned at table loading.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala:
##########
@@ -122,6 +123,21 @@ case class DataSourceV2Relation(
     copy(output = output.map(_.newInstance()))
   }
 
+  // Replace the `table` field with an identity placeholder. This allows two 
relations
+  // that point at the same logical table compare equal even when one is 
wrapped (e.g.
+  // by a transaction-aware catalog). Note, when `Table.id()` is null we key 
only on catalog
+  // and identifier, thus, drop-and-recreate is not distinguished.
+  override def doCanonicalize(): LogicalPlan = {

Review Comment:
   This breaks version-aware and context-aware comparison in caching as looking 
at the identifiers is not sufficient to tell if the table state is the same. We 
must rely on correct equality and hash code of `Table` implementations. In 
other words, instead of normalizing `DataSourceV2Relation` on the Spark side, 
connectors must correctly implement `equals` and `hashCode` for `Table`.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala:
##########
@@ -479,8 +481,13 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
     lookupCachedDataInternal(normalized)
   }
 
-  private def lookupCachedDataInternal(plan: LogicalPlan): Option[CachedData] 
= {
-    val result = cachedData.find(cd => plan.sameResult(cd.plan))
+  private def lookupCachedDataInternal(

Review Comment:
   What about making these core methods a bit more generic?
   
   ```
   private def lookupCachedDataInternal(
       plan: LogicalPlan,
       canUse: CachedData => Boolean = _ => true): Option[CachedData] = {
     // impl
   }
   
   private[sql] def useCachedData(
       plan: LogicalPlan,
       canUse: CachedData => Boolean = _ => true): LogicalPlan = {
     // impl
   }
   ```
   
   We can then offer one more with a transaction that would internally create 
canUse. This method will be used from QueryExecution.
   
   ```
   private[sql] def useCachedData(plan: LogicalPlan, txn: Transaction): 
LogicalPlan = {
     useCachedData(plan, transactionEntryFilter(txn))
   }
   ```
   



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java:
##########
@@ -66,6 +68,29 @@ public interface Transaction extends Closeable {
    */
   void abort();
 
+  /**
+   * Attempts to register a list of materialized scans against this 
transaction's read set.
+   * <p>
+   * Spark calls this when considering reuse of a cached subtree during the 
transaction. The
+   * list contains every materialized scan in the candidate cached subtree. 
That may include
+   * scans that belong to catalogs other than this transaction's catalog. The 
connector can
+   * decide which scans to register or ignore.
+   * <p>
+   * The connector decides whether reusing the cached snapshots is compatible 
with the

Review Comment:
   We can do some sanity checks for versions but it should not be in the public 
docs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to