cloud-fan commented on code in PR #55540: URL: https://github.com/apache/spark/pull/55540#discussion_r3266598523
########## sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +import java.util.concurrent.ConcurrentHashMap + +/** + * An InMemoryTableCatalog that simulates a caching connector like + * Iceberg's CachingCatalog. On first loadTable, returns a fresh + * copy. On subsequent loads, returns the CACHED (stale) copy, + * making external changes invisible. + * + * Session writes go through the SQL path which modifies the + * original table and invalidates, but direct catalog API + * modifications are not visible until the cache is cleared. Review Comment: The "and invalidates" half of this sentence is both grammatically incomplete (no object) and not actually exercised by any test in this PR — `INSERT` goes through the un-overridden write-variant `loadTable(ident, writePrivileges)` and never touches the cache. Only explicit `REFRESH TABLE` / `invalidateTable` clears the cache. Suggest tightening to describe what the code actually does, e.g.: > Session writes go through the write-variant `loadTable`, which is not cached, so they modify the underlying table directly. Cached `loadTable` results may still be stale until `clearCache()` or `REFRESH TABLE` (which invokes `invalidateTable`) is called. ########## sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +import java.util.concurrent.ConcurrentHashMap + +/** + * An InMemoryTableCatalog that simulates a caching connector like + * Iceberg's CachingCatalog. On first loadTable, returns a fresh + * copy. On subsequent loads, returns the CACHED (stale) copy, + * making external changes invisible. + * + * Session writes go through the SQL path which modifies the + * original table and invalidates, but direct catalog API + * modifications are not visible until the cache is cleared. + * + * Call [[CachingInMemoryTableCatalog.clearCache()]] to simulate + * cache expiration (like Iceberg's 30-second TTL). + */ +class CachingInMemoryTableCatalog extends InMemoryTableCatalog { + import CachingInMemoryTableCatalog._ + + override def loadTable(ident: Identifier): Table = { + cachedTables.computeIfAbsent(cacheKey(name, ident), _ => { + super.loadTable(ident) + }) + } + + override def invalidateTable(ident: Identifier): Unit = { + super.invalidateTable(ident) + cachedTables.remove(cacheKey(name, ident)) + } + + private def cacheKey( + catalog: String, ident: Identifier): String = { + s"$catalog.${ident.toString}" + } +} + +object CachingInMemoryTableCatalog { + private val cachedTables = + new ConcurrentHashMap[String, Table]() + + def clearCache(): Unit = cachedTables.clear() Review Comment: The cache is a JVM-global static, which can leak across suites if any other suite ever uses this catalog in the same JVM. Within this PR the per-test `after { clearCache() ... }` works, but a per-instance cache would be safer and also lets you key directly by `Identifier`: ```scala class CachingInMemoryTableCatalog extends InMemoryTableCatalog { private val cachedTables = new ConcurrentHashMap[Identifier, Table]() override def loadTable(ident: Identifier): Table = cachedTables.computeIfAbsent(ident, _ => super.loadTable(ident)) override def invalidateTable(ident: Identifier): Unit = { super.invalidateTable(ident) cachedTables.remove(ident) } def clearCache(): Unit = cachedTables.clear() } ``` `DataSourceV2DataFrameSuite.after` would then call `catalog("cachingcat").asInstanceOf[CachingInMemoryTableCatalog].clearCache()` instead of the static. ########## sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala: ########## @@ -179,6 +179,20 @@ class BasicInMemoryTableCatalog extends TableCatalog { throw new IllegalArgumentException(s"Cannot drop all fields") } + // Compute the intermediate schema with only column deletions applied. + // This is used for data migration so that dropped column values are physically removed, + // even when a column with the same name is re-added in the same ALTER call. + val deleteOnlyChanges = changes.filter(_.isInstanceOf[TableChange.DeleteColumn]) + val schemaAfterDrops = if (deleteOnlyChanges.nonEmpty) { + CatalogV2Util.applySchemaChanges( + table.schema, + deleteOnlyChanges, + tableProvider = Some("in-memory"), + statementType = "ALTER TABLE") + } else { + schema + } Review Comment: Two things on this block: 1. **The PR description undersells this.** This is a behavior change in shared test infrastructure — any test that calls `catalog.alterTable(ident, drop, add)` in a single call now gets different data-migration semantics. Worth calling out explicitly in the PR description. 2. **The `why` is non-obvious from the current comment.** `InMemoryBaseTable.alterTableWithData` decides which old-row fields to keep by matching names against its `newSchema` argument. Passing the *post-drop intermediate* schema (rather than the final schema with `salary` re-added) is what makes the old data actually drop out — otherwise the name match retains the old `salary` value into the new column. A one-line pointer at `alterTableWithData` would help future readers. Optional simplification: the recomputation via `applySchemaChanges` can be replaced with a direct filter, since we already have `table.schema` in hand: ```scala val deletedTopLevelNames = changes.collect { case d: TableChange.DeleteColumn if d.fieldNames.length == 1 => d.fieldNames.head }.toSet val schemaAfterDrops = if (deletedTopLevelNames.nonEmpty) { StructType(table.schema.fields.filterNot(f => deletedTopLevelNames(f.name))) } else { schema } ``` (Current tests only drop top-level columns; if nested drops ever need this treatment too, the filter would need to be extended — but the current `applySchemaChanges` form would need the same care for nested cases since `alterTableWithData` only filters by top-level field name anyway.) ########## sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala: ########## @@ -2976,6 +2997,584 @@ class DataSourceV2DataFrameSuite } } + // Temp views with stored plans: scenarios from the DSv2 table refresh tests. + // Each test creates a DSv2 table with initial data, builds a temp view with a filter + // (to demonstrate that the stored plan is non-trivial), and then verifies the view + // behavior after various table modifications (session or external). + + /** Appends rows to a DSv2 table via the catalog API, bypassing the session. */ + private def externalAppend( + catalogName: String, + ident: Identifier, + schema: StructType, + row: InternalRow): Unit = { + val extTable = catalog(catalogName).loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row))) + } + + // Scenario 1.1 (session write) + test("temp view with stored plan reflects session write") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + sql(s"INSERT INTO $t VALUES (2, 200)") + + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 1.2 (external write) + test("temp view with stored plan reflects external write") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external writer adds (2, 200) via direct catalog API + externalAppend( + catalogName = "testcat", + ident = ident, + schema = StructType.fromDDL("id INT, salary INT"), + row = InternalRow(2, 200)) + + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 1.2 connector w/ cache (external write, caching connector) + test("connector w/ cache: temp view stale after external write") { + val t = "cachingcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external writer adds (2, 200) via catalog API (bypasses cache) + externalAppend( + catalogName = "cachingcat", + ident = ident, + schema = StructType.fromDDL("id INT, salary INT"), + row = InternalRow(2, 200)) + + // Caching connector returns stale table: external write invisible + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, external write becomes visible + sql(s"REFRESH TABLE $t") + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 2.1 (session ADD COLUMN) + test("temp view with stored plan preserves schema after session ADD COLUMN") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + sql(s"ALTER TABLE $t ADD COLUMN new_column INT") + sql(s"INSERT INTO $t VALUES (2, 200, -1)") + + // view preserves original 2-column schema, filter still applied + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 2.2 (external ADD COLUMN) + test("temp view with stored plan preserves schema after external ADD COLUMN") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external schema change via catalog API + val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) + catalog("testcat").alterTable(ident, addCol) + + // external writer adds data with new schema + externalAppend( + catalogName = "testcat", + ident = ident, + schema = StructType.fromDDL("id INT, salary INT, new_column INT"), + row = InternalRow(2, 200, -1)) + + // view preserves original 2-column schema, filter still applied + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector) + test("connector w/ cache: temp view stale after external ADD COLUMN") { + val t = "cachingcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external schema change + data via catalog API + val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) + catalog("cachingcat").alterTable(ident, addCol) + + externalAppend( + catalogName = "cachingcat", + ident = ident, + schema = StructType.fromDDL("id INT, salary INT, new_column INT"), + row = InternalRow(2, 200, -1)) + + // Caching connector returns stale table: external changes invisible + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, view preserves original 2-column schema + sql(s"REFRESH TABLE $t") + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 3.1 (session column removal) + test("temp view with stored plan detects session column removal") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // session schema change via SQL + sql(s"ALTER TABLE $t DROP COLUMN salary") + + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` INT has been removed")) + } + } + + // Scenario 3.2 (external column removal) + test("temp view with stored plan detects external column removal") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external schema change via catalog API + val dropCol = TableChange.deleteColumn(Array("salary"), false) + catalog("testcat").alterTable(ident, dropCol) + + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` INT has been removed")) + } + } + + // Scenario 3.2 connector w/ cache (external column removal, caching connector) + test("connector w/ cache: temp view stale after external column removal") { + val t = "cachingcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external column removal via catalog API + val dropCol = TableChange.deleteColumn(Array("salary"), false) + catalog("cachingcat").alterTable(ident, dropCol) + + // Caching connector returns stale table: column removal invisible, no error + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, column removal detected + sql(s"REFRESH TABLE $t") + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` INT has been removed")) + } + } + + // Scenario 4.1 (session drop and recreate table) + test("temp view with stored plan resolves to session-recreated table") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + val originalTableId = catalog("testcat").loadTable(ident).id + + // session drop and recreate via SQL + sql(s"DROP TABLE $t") + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + + val newTableId = catalog("testcat").loadTable(ident).id + assert(originalTableId != newTableId) + + // view resolves to the new empty table + checkAnswer(spark.table("v"), Seq.empty) + + // insert new data and verify the view picks it up + sql(s"INSERT INTO $t VALUES (2, 200)") + checkAnswer(spark.table("v"), Seq(Row(2, 200))) + } + } + + // Scenario 4.2 (external drop and recreate table) + test("temp view with stored plan resolves to externally recreated table") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + val originalTableId = catalog("testcat").loadTable(ident).id + + // external drop and recreate via catalog API + catalog("testcat").dropTable(ident) + catalog("testcat").createTable( + ident, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + val newTableId = catalog("testcat").loadTable(ident).id + assert(originalTableId != newTableId) + + // view resolves to the new empty table + checkAnswer(spark.table("v"), Seq.empty) + + // insert new data and verify the view picks it up + sql(s"INSERT INTO $t VALUES (2, 200)") + checkAnswer(spark.table("v"), Seq(Row(2, 200))) + } + } + + // Scenario 4.2 connector w/ cache (external drop/recreate, caching connector) + test("connector w/ cache: temp view stale after external drop/recreate") { + val t = "cachingcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external drop and recreate via catalog API + catalog("cachingcat").dropTable(ident) + catalog("cachingcat").createTable( + ident, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + // Caching connector returns stale table: drop/recreate invisible + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, view resolves to new empty table + sql(s"REFRESH TABLE $t") + checkAnswer(spark.table("v"), Seq.empty) + } + } + + // Scenario 5.1 (session drop and re-add column with same type, multiple views) + test("temp view with stored plan after session drop and re-add column same type" + + " with unfiltered view") { Review Comment: This test is a strict superset of the existing `temp view with stored plan after session drop and re-add column same type` at line 2511 — same scenario, same `v` (`salary < 999`) and `v_filter_is_null` (`salary IS NULL`) views, just one extra `v_no_filter`. Either delete the existing test in favor of this one, or fold the `v_no_filter` assertion into the existing test and drop this new one — carrying both is redundant. ########## sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala: ########## @@ -2976,6 +2997,584 @@ class DataSourceV2DataFrameSuite } } + // Temp views with stored plans: scenarios from the DSv2 table refresh tests. + // Each test creates a DSv2 table with initial data, builds a temp view with a filter + // (to demonstrate that the stored plan is non-trivial), and then verifies the view + // behavior after various table modifications (session or external). + + /** Appends rows to a DSv2 table via the catalog API, bypassing the session. */ + private def externalAppend( + catalogName: String, + ident: Identifier, + schema: StructType, + row: InternalRow): Unit = { + val extTable = catalog(catalogName).loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row))) + } Review Comment: The `schema` parameter is currently derived by hand at every call site (e.g. `StructType.fromDDL("id INT, salary INT, new_column INT")`). It can be read off the table after `loadTable`, which removes a class of "schema parameter desynced from table state after `alterTable`" bugs: ```scala private def externalAppend( catalogName: String, ident: Identifier, row: InternalRow): Unit = { val extTable = catalog(catalogName).loadTable(ident, util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] val schema = CatalogV2Util.v2ColumnsToStructType(extTable.columns()) extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row))) } ``` The row layout must still match the current table column order, so worth a one-line comment to that effect. Current callers are correct, but the API as written encourages keeping two schema copies in sync. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
