aokolnychyi commented on code in PR #56121:
URL: https://github.com/apache/spark/pull/56121#discussion_r3350507931
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java:
##########
@@ -66,6 +68,29 @@ public interface Transaction extends Closeable {
*/
void abort();
+ /**
+ * Attempts to register a list of materialized scans against this
transaction's read set.
+ * <p>
+ * Spark calls this when considering reuse of a cached subtree during the
transaction. The
+ * list contains every materialized scan in the candidate cached subtree.
That may include
+ * scans that belong to catalogs other than this transaction's catalog. The
connector can
+ * decide which scans to register or ignore.
+ * <p>
+ * The connector decides whether reusing the cached snapshots is compatible
with the
+ * transaction's isolation contract. It must either accept the cache entry
(returning
+ * {@code true} after adding any of its own scans to the read set) or refuse
it (returning
+ * {@code false} without modifying the read set).
+ * <p>
+ * This method may be called multiple times during a single query with
overlapping scan
+ * lists. Registering a scan that is already in the read set must be a no-op.
+ *
+ * @param scans Every materialized scan in the candidate cached subtree.
+ * @return true if the connector accepts reuse of the cache entry; false
otherwise.
+ */
+ default boolean registerScans(List<Scan> scans) {
Review Comment:
Do we want to provide a default implementation here? The API hasn't been
released and it may be better to explicitly check what behavior the connector
prefers.
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java:
##########
@@ -66,6 +68,29 @@ public interface Transaction extends Closeable {
*/
void abort();
+ /**
+ * Attempts to register a list of materialized scans against this
transaction's read set.
+ * <p>
+ * Spark calls this when considering reuse of a cached subtree during the
transaction. The
+ * list contains every materialized scan in the candidate cached subtree.
That may include
+ * scans that belong to catalogs other than this transaction's catalog. The
connector can
+ * decide which scans to register or ignore.
+ * <p>
+ * The connector decides whether reusing the cached snapshots is compatible
with the
+ * transaction's isolation contract. It must either accept the cache entry
(returning
+ * {@code true} after adding any of its own scans to the read set) or refuse
it (returning
+ * {@code false} without modifying the read set).
+ * <p>
+ * This method may be called multiple times during a single query with
overlapping scan
+ * lists. Registering a scan that is already in the read set must be a no-op.
+ *
+ * @param scans Every materialized scan in the candidate cached subtree.
+ * @return true if the connector accepts reuse of the cache entry; false
otherwise.
+ */
+ default boolean registerScans(List<Scan> scans) {
Review Comment:
Shall we pass scans as an array (Scan[]) or as a list (List<Scan>)? Would an
array be more consistent with the rest of the APIs in Data Source V2? It is
your call, @andreaschat-db. I am OK either way.
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java:
##########
@@ -66,6 +68,29 @@ public interface Transaction extends Closeable {
*/
void abort();
+ /**
+ * Attempts to register a list of materialized scans against this
transaction's read set.
+ * <p>
+ * Spark calls this when considering reuse of a cached subtree during the
transaction. The
Review Comment:
Caching is one use case. You may mention it as an example but without too
much details. I don't want to limit the use of this method as there may be
valid use cases in the future.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala:
##########
@@ -489,16 +496,34 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
result
}
+ // Decides whether the cached entry can be substituted into a plan being
executed inside
+ // the given transaction. It identifies the scans and attempts to register
them in the connector.
+ // The connector returns true if reusing the cached snapshot is consistent
with its isolation
+ // contract. Note, the scans might belong to different catalogs. The
connector can decide which
+ // one to register and which ones to ignore.
+ private def validateCachedEntryForTransaction(cd: CachedData, txn:
Transaction): Boolean = {
Review Comment:
I think we need to limit this to only the catalog for which we started the
transaction.
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java:
##########
@@ -66,6 +68,29 @@ public interface Transaction extends Closeable {
*/
void abort();
+ /**
+ * Attempts to register a list of materialized scans against this
transaction's read set.
+ * <p>
+ * Spark calls this when considering reuse of a cached subtree during the
transaction. The
+ * list contains every materialized scan in the candidate cached subtree.
That may include
+ * scans that belong to catalogs other than this transaction's catalog. The
connector can
Review Comment:
Hm, this doesn't seem to make sense. I thought we would only pass scans for
the current catalog. We don't support transactions across catalogs, don't we?
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java:
##########
@@ -66,6 +68,29 @@ public interface Transaction extends Closeable {
*/
void abort();
+ /**
+ * Attempts to register a list of materialized scans against this
transaction's read set.
+ * <p>
+ * Spark calls this when considering reuse of a cached subtree during the
transaction. The
+ * list contains every materialized scan in the candidate cached subtree.
That may include
+ * scans that belong to catalogs other than this transaction's catalog. The
connector can
+ * decide which scans to register or ignore.
+ * <p>
+ * The connector decides whether reusing the cached snapshots is compatible
with the
Review Comment:
Methods like `Table$equals()` and `Table$hashCode()` must be version-aware
by definition, meaning we will only have a cache match if the state is
compatible.
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -684,6 +666,18 @@ abstract class InMemoryBaseTable(
options: CaseInsensitiveStringMap)
extends BatchScanBaseClass(_data, readSchema, tableSchema) with
SupportsRuntimeFiltering {
+ // Snapshot of the table version when this scan was built.
+ val builtAtTableVersion: Int = InMemoryBaseTable.this.tableVersion
+
+ // The current table version, read fresh on each access.
+ def currentTableVersion: Int = InMemoryBaseTable.this.tableVersion
Review Comment:
DSv2 versions must be pinned at table loading.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala:
##########
@@ -122,6 +123,21 @@ case class DataSourceV2Relation(
copy(output = output.map(_.newInstance()))
}
+ // Replace the `table` field with an identity placeholder. This allows two
relations
+ // that point at the same logical table compare equal even when one is
wrapped (e.g.
+ // by a transaction-aware catalog). Note, when `Table.id()` is null we key
only on catalog
+ // and identifier, thus, drop-and-recreate is not distinguished.
+ override def doCanonicalize(): LogicalPlan = {
Review Comment:
This breaks version-aware and context-aware comparison in caching as looking
at the identifiers is not sufficient to tell if the table state is the same. We
must rely on correct equality and hash code of `Table` implementations. In
other words, instead of normalizing `DataSourceV2Relation` on the Spark side,
connectors must correctly implement `equals` and `hashCode` for `Table`.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala:
##########
@@ -479,8 +481,13 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
lookupCachedDataInternal(normalized)
}
- private def lookupCachedDataInternal(plan: LogicalPlan): Option[CachedData]
= {
- val result = cachedData.find(cd => plan.sameResult(cd.plan))
+ private def lookupCachedDataInternal(
Review Comment:
What about making these core methods a bit more generic?
```
private def lookupCachedDataInternal(
plan: LogicalPlan,
canUse: CachedData => Boolean = _ => true): Option[CachedData] = {
// impl
}
private[sql] def useCachedData(
plan: LogicalPlan,
canUse: CachedData => Boolean = _ => true): LogicalPlan = {
// impl
}
```
We can then offer one more with a transaction that would internally create
canUse. This method will be used from QueryExecution.
```
private[sql] def useCachedData(plan: LogicalPlan, txn: Transaction):
LogicalPlan = {
useCachedData(plan, transactionEntryFilter(txn))
}
```
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java:
##########
@@ -66,6 +68,29 @@ public interface Transaction extends Closeable {
*/
void abort();
+ /**
+ * Attempts to register a list of materialized scans against this
transaction's read set.
+ * <p>
+ * Spark calls this when considering reuse of a cached subtree during the
transaction. The
+ * list contains every materialized scan in the candidate cached subtree.
That may include
+ * scans that belong to catalogs other than this transaction's catalog. The
connector can
+ * decide which scans to register or ignore.
+ * <p>
+ * The connector decides whether reusing the cached snapshots is compatible
with the
Review Comment:
We can do some sanity checks for versions but it should not be in the public
docs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]