cloud-fan commented on code in PR #55463:
URL: https://github.com/apache/spark/pull/55463#discussion_r3318259459
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -97,6 +97,23 @@ abstract class InMemoryBaseTable(
tableVersion = version.toInt
}
+ /**
+ * Copies version and validated version from another table.
+ *
+ * Test catalogs that decorate tables (e.g.
[[NullColumnIdInMemoryTableCatalog]],
+ * [[NullTableIdAndNullColumnIdInMemoryTableCatalog]]) create new objects
that start
+ * at version 0. Without this call, [[V2TableRefreshUtil]] would see version
0 on
+ * every load and never detect that the table has changed, breaking
stale-table
+ * refresh for incrementally constructed queries (e.g. joining DataFrames
analyzed
+ * at different times).
Review Comment:
The claim that `V2TableRefreshUtil` would *see version 0 on every load and
never detect that the table has changed* doesn't quite match what
`V2TableRefreshUtil` does. Looking at `refresh()`, it gates on `r.isVersioned
|| !versionedOnly` (where `isVersioned = (version != null)`), then always
reloads via `catalog.loadTable(ident)` (or the relation cache) and
unconditionally replaces `r.table = currentTable`. It doesn't compare captured
vs. current version values to skip work, so this comment overstates the role of
version equality.
The actual reason version preservation matters is more mundane: decorating
catalogs that don't propagate the version reset the counter to 0 on every wrap,
which breaks the monotonic-version assumption that downstream consumers (e.g.
`InMemoryTable.copy`, validated-version propagation, and the join-refresh tests
in `DSv2IncrementallyConstructedQueryTests`) rely on. Could you rephrase along
those lines?
##########
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala:
##########
@@ -21,28 +21,37 @@ import scala.reflect.ClassTag
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession}
-import org.apache.spark.sql.connector.{DSv2CacheTableReadTests,
DSv2RepeatedTableAccessTests, DSv2TempViewWithStoredPlanTests}
-import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog,
InMemoryTableCatalog, TableCatalog}
+import org.apache.spark.sql.connector.{DSv2CacheTableReadTests,
DSv2RepeatedTableAccessTests, DSv2TempViewWithStoredPlanTests,
DSv2IncrementallyConstructedQueryTests}
Review Comment:
Nit: the new selector `DSv2IncrementallyConstructedQueryTests` should be
inserted in alphabetical order (between `DSv2CacheTableReadTests` and
`DSv2RepeatedTableAccessTests`), matching the convention used elsewhere in
Spark — see `DataSourceV2DataFrameSuite.scala:32`, which already places the new
`NullTableIdAndNullColumnIdInMemoryTableCatalog` in its alphabetical slot.
```suggestion
import org.apache.spark.sql.connector.{DSv2CacheTableReadTests,
DSv2IncrementallyConstructedQueryTests, DSv2RepeatedTableAccessTests,
DSv2TempViewWithStoredPlanTests}
```
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2IncrementallyConstructedQueryTests.scala:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.{Column, InMemoryTableCatalog,
NullTableIdAndNullColumnIdInMemoryTableCatalog, TableCatalog, TableChange,
TableInfo}
+import org.apache.spark.sql.types.{IntegerType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Tests for incrementally constructed queries where df1 and df2 are analyzed
at different
+ * times, then joined. The refresh phase in QueryExecution must align table
versions across
+ * all references.
+ *
+ * Classic and Connect modes produce different results in some scenarios
because in Connect
+ * mode, resolution is deferred until execution, so both sides of a join
always see the
+ * latest table state.
+ *
+ * NOTE: All `session.sql(...)` calls append `.collect()` because Connect
client DataFrames
+ * are lazy and require an action to trigger execution. In classic mode
`.collect()` on
+ * eager statements (DDL, INSERT) is a no-op, so this is harmless.
+ */
+trait DSv2IncrementallyConstructedQueryTests extends
DSv2ExternalMutationTestBase {
+
+ //
---------------------------------------------------------------------------
+ // Scenario 1: join after insert refreshes both sides to latest version.
+ // Both classic and Connect see the inserted data.
+ //
---------------------------------------------------------------------------
+
+ test(s"${testPrefix}SPARK-54157: join refreshes both sides after external
insert" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200))
+
+ val df2 = session.table(testTable)
+
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200)))
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54157: join refreshes both sides after
same-session insert" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect()
+
+ val df2 = session.table(testTable)
+
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200)))
+ }
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Scenario 2: join after ADD COLUMN.
+ // Classic: df1 keeps its original 2-column schema.
+ // Connect: re-resolves df1 with the new 3-column schema.
+ //
---------------------------------------------------------------------------
+
+ test(s"${testPrefix}SPARK-54157: join after external ADD COLUMN" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ catalog.alterTable(
+ testIdent, TableChange.addColumn(Array("new_column"), IntegerType,
true))
+ externalAppend(
+ catalog = catalog, ident = testIdent, row = InternalRow(2, 200, -1))
+
+ val df2 = session.table(testTable)
+ val selfJoin = df1.join(df2, df1("id") === df2("id"))
+
+ if (isConnect) {
+ // Connect re-resolves df1 with the new 3-column schema (id, salary,
new_column).
+ assert(selfJoin.columns.length == 6,
+ s"Expected 6 columns (3 + 3) but got:
${selfJoin.columns.mkString(", ")}")
+ checkRows(selfJoin,
+ Seq(Row(1, 100, null, 1, 100, null), Row(2, 200, -1, 2, 200, -1)))
+ } else {
+ // Classic: df1 keeps its original 2-column schema (id, salary).
+ assert(selfJoin.columns.length == 5,
+ s"Expected 5 columns (2 + 3) but got:
${selfJoin.columns.mkString(", ")}")
+ checkRows(selfJoin,
+ Seq(Row(1, 100, 1, 100, null), Row(2, 200, 2, 200, -1)))
+ }
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54157: join after same-session ADD COLUMN" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ session.sql(s"ALTER TABLE $testTable ADD COLUMN new_column
INT").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (2, 200, -1)").collect()
+
+ val df2 = session.table(testTable)
+ val selfJoin = df1.join(df2, df1("id") === df2("id"))
+
+ if (isConnect) {
+ // Connect re-resolves df1 with the new 3-column schema (id, salary,
new_column).
+ assert(selfJoin.columns.length == 6,
+ s"Expected 6 columns (3 + 3) but got:
${selfJoin.columns.mkString(", ")}")
+ checkRows(selfJoin,
+ Seq(Row(1, 100, null, 1, 100, null), Row(2, 200, -1, 2, 200, -1)))
+ } else {
+ // Classic: df1 keeps its original 2-column schema (id, salary).
+ assert(selfJoin.columns.length == 5,
+ s"Expected 5 columns (2 + 3) but got:
${selfJoin.columns.mkString(", ")}")
+ checkRows(selfJoin,
+ Seq(Row(1, 100, 1, 100, null), Row(2, 200, 2, 200, -1)))
+ }
+ }
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Scenario 3: join after DROP COLUMN.
+ // Classic: df1 references the dropped column, fails with COLUMNS_MISMATCH.
+ // Connect: re-resolves df1 without the dropped column, join succeeds.
+ //
---------------------------------------------------------------------------
+
+ test(s"${testPrefix}SPARK-54157: join after external DROP COLUMN" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ catalog.alterTable(
+ testIdent, TableChange.deleteColumn(Array("salary"), false))
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2))
+
+ val df2 = session.table(testTable)
+
+ if (isConnect) {
+ // Connect re-resolves df1 without the dropped column.
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, 1), Row(2, 2)))
+ } else {
+ // Classic: df1 references the dropped column.
+ checkError(
+ exception = intercept[AnalysisException] {
+ df1.join(df2, df1("id") === df2("id")).collect()
+ },
+ condition =
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH",
+ matchPVals = true,
+ parameters = Map("tableName" -> ".*", "errors" -> ".*"))
Review Comment:
Inconsistent regex DOTALL flag across the `checkError` calls in this trait:
- Scenario 4b (line 326) uses `"errors" -> "(?s).*"`.
- Scenarios 3, 3-session, 5a, 6 (lines 195, 228, 406, 480) use `"errors" ->
".*"`.
The existing column-ID-mismatch test in
`DataSourceV2DataFrameSuite.scala:2632` already uses `(?s).*`, which suggests
these error messages can span multiple lines. Without `(?s)`, `.*` won't match
across newlines, so the assertion can silently fail to match part (or all) of a
multi-line error body. I'd align all five to `"errors" -> "(?s).*"`.
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala:
##########
@@ -51,6 +51,9 @@ trait DSv2ExternalMutationTestBase extends QueryTest {
/** Prefix for test names, e.g. "" or "[connect] ". */
protected def testPrefix: String
+ /** Whether this suite runs under Spark Connect. */
+ protected def isConnect: Boolean = testPrefix == "[connect] "
Review Comment:
Nit/follow-up (not blocking): deriving `isConnect` from `testPrefix ==
"[connect] "` couples the boolean to a brittle string match. If anyone changes
the prefix wording or capitalization, every `if (isConnect)` branch in
`DSv2IncrementallyConstructedQueryTests` silently flips to the classic-mode
path and the Connect-mode assertions stop running. A safer shape is to make
`isConnect` an abstract `def` and have each concrete suite set it explicitly
alongside `testPrefix` (`= false` in `DataSourceV2DataFrameSuite`, `= true` in
`DataSourceV2DataFrameConnectSuite`). Feel free to leave for a follow-up.
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2IncrementallyConstructedQueryTests.scala:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.{Column, InMemoryTableCatalog,
NullTableIdAndNullColumnIdInMemoryTableCatalog, TableCatalog, TableChange,
TableInfo}
+import org.apache.spark.sql.types.{IntegerType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Tests for incrementally constructed queries where df1 and df2 are analyzed
at different
+ * times, then joined. The refresh phase in QueryExecution must align table
versions across
+ * all references.
+ *
+ * Classic and Connect modes produce different results in some scenarios
because in Connect
+ * mode, resolution is deferred until execution, so both sides of a join
always see the
+ * latest table state.
+ *
+ * NOTE: All `session.sql(...)` calls append `.collect()` because Connect
client DataFrames
+ * are lazy and require an action to trigger execution. In classic mode
`.collect()` on
+ * eager statements (DDL, INSERT) is a no-op, so this is harmless.
+ */
+trait DSv2IncrementallyConstructedQueryTests extends
DSv2ExternalMutationTestBase {
+
+ //
---------------------------------------------------------------------------
+ // Scenario 1: join after insert refreshes both sides to latest version.
+ // Both classic and Connect see the inserted data.
+ //
---------------------------------------------------------------------------
+
+ test(s"${testPrefix}SPARK-54157: join refreshes both sides after external
insert" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200))
+
+ val df2 = session.table(testTable)
+
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200)))
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54157: join refreshes both sides after
same-session insert" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect()
+
+ val df2 = session.table(testTable)
+
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200)))
+ }
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Scenario 2: join after ADD COLUMN.
+ // Classic: df1 keeps its original 2-column schema.
+ // Connect: re-resolves df1 with the new 3-column schema.
+ //
---------------------------------------------------------------------------
+
+ test(s"${testPrefix}SPARK-54157: join after external ADD COLUMN" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ catalog.alterTable(
+ testIdent, TableChange.addColumn(Array("new_column"), IntegerType,
true))
+ externalAppend(
+ catalog = catalog, ident = testIdent, row = InternalRow(2, 200, -1))
+
+ val df2 = session.table(testTable)
+ val selfJoin = df1.join(df2, df1("id") === df2("id"))
+
+ if (isConnect) {
+ // Connect re-resolves df1 with the new 3-column schema (id, salary,
new_column).
+ assert(selfJoin.columns.length == 6,
+ s"Expected 6 columns (3 + 3) but got:
${selfJoin.columns.mkString(", ")}")
+ checkRows(selfJoin,
+ Seq(Row(1, 100, null, 1, 100, null), Row(2, 200, -1, 2, 200, -1)))
+ } else {
+ // Classic: df1 keeps its original 2-column schema (id, salary).
+ assert(selfJoin.columns.length == 5,
+ s"Expected 5 columns (2 + 3) but got:
${selfJoin.columns.mkString(", ")}")
+ checkRows(selfJoin,
+ Seq(Row(1, 100, 1, 100, null), Row(2, 200, 2, 200, -1)))
+ }
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54157: join after same-session ADD COLUMN" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ session.sql(s"ALTER TABLE $testTable ADD COLUMN new_column
INT").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (2, 200, -1)").collect()
+
+ val df2 = session.table(testTable)
+ val selfJoin = df1.join(df2, df1("id") === df2("id"))
+
+ if (isConnect) {
+ // Connect re-resolves df1 with the new 3-column schema (id, salary,
new_column).
+ assert(selfJoin.columns.length == 6,
+ s"Expected 6 columns (3 + 3) but got:
${selfJoin.columns.mkString(", ")}")
+ checkRows(selfJoin,
+ Seq(Row(1, 100, null, 1, 100, null), Row(2, 200, -1, 2, 200, -1)))
+ } else {
+ // Classic: df1 keeps its original 2-column schema (id, salary).
+ assert(selfJoin.columns.length == 5,
+ s"Expected 5 columns (2 + 3) but got:
${selfJoin.columns.mkString(", ")}")
+ checkRows(selfJoin,
+ Seq(Row(1, 100, 1, 100, null), Row(2, 200, 2, 200, -1)))
+ }
+ }
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Scenario 3: join after DROP COLUMN.
+ // Classic: df1 references the dropped column, fails with COLUMNS_MISMATCH.
+ // Connect: re-resolves df1 without the dropped column, join succeeds.
+ //
---------------------------------------------------------------------------
+
+ test(s"${testPrefix}SPARK-54157: join after external DROP COLUMN" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ catalog.alterTable(
+ testIdent, TableChange.deleteColumn(Array("salary"), false))
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2))
+
+ val df2 = session.table(testTable)
+
+ if (isConnect) {
+ // Connect re-resolves df1 without the dropped column.
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, 1), Row(2, 2)))
+ } else {
+ // Classic: df1 references the dropped column.
+ checkError(
+ exception = intercept[AnalysisException] {
+ df1.join(df2, df1("id") === df2("id")).collect()
+ },
+ condition =
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH",
+ matchPVals = true,
+ parameters = Map("tableName" -> ".*", "errors" -> ".*"))
+ }
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54157: join after same-session DROP COLUMN" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ session.sql(s"ALTER TABLE $testTable DROP COLUMN salary").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (2)").collect()
+
+ val df2 = session.table(testTable)
+
+ if (isConnect) {
+ // Connect re-resolves df1 without the dropped column.
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, 1), Row(2, 2)))
+ } else {
+ // Classic: df1 references the dropped column.
+ checkError(
+ exception = intercept[AnalysisException] {
+ df1.join(df2, df1("id") === df2("id")).collect()
+ },
+ condition =
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH",
+ matchPVals = true,
+ parameters = Map("tableName" -> ".*", "errors" -> ".*"))
+ }
+ }
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Scenario 4: external drop and recreate table.
+ // 4a: table ID detects it, TABLE_ID_MISMATCH in classic, succeeds in Connect
+ // 4b: column IDs detect it, COLUMN_ID_MISMATCH in classic, succeeds in
Connect
+ // 4c: no IDs, goes undetected, join succeeds (both modes)
+ //
---------------------------------------------------------------------------
+
+ test(s"${testPrefix}SPARK-54157: join after external table drop and
recreate" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ val originTableId = catalog.loadTable(testIdent).id
+
+ catalog.dropTable(testIdent)
+ catalog.createTable(
+ testIdent,
+ new TableInfo.Builder()
+ .withColumns(Array(
+ Column.create("id", IntegerType),
+ Column.create("salary", IntegerType)))
+ .build())
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200))
+
+ val df2 = session.table(testTable)
+ val newTableId = catalog.loadTable(testIdent).id
+ assert(originTableId != newTableId)
+
+ if (isConnect) {
+ // Connect re-resolves both sides to the recreated table.
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(2, 200, 2, 200)))
+ } else {
+ // Classic: table ID changed.
+ checkError(
+ exception = intercept[AnalysisException] {
+ df1.join(df2, df1("id") === df2("id")).collect()
+ },
+ condition =
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.TABLE_ID_MISMATCH",
+ matchPVals = true,
+ parameters = Map(
+ "tableName" -> ".*",
+ "capturedTableId" -> ".*",
+ "currentTableId" -> ".*"))
+ }
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54157: join after external drop/recreate" +
+ " (table without table ID support, but with column ID support)") {
+ val nullIdT = "nullidcat.ns1.ns2.tbl"
+ withTestSession { session =>
+ withTestTableAndViews(session, nullIdT) {
+ session.sql(s"CREATE TABLE $nullIdT (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $nullIdT VALUES (1, 100)").collect()
+
+ val df1 = session.table(nullIdT)
+ val catalog = getTableCatalog[TableCatalog](session, "nullidcat")
Review Comment:
Nit (also at line 385; contrast with the narrower types at lines 341 and
422): scenarios 4b and 5a use `getTableCatalog[TableCatalog]`, while scenarios
4c and 5b use
`getTableCatalog[NullTableIdAndNullColumnIdInMemoryTableCatalog]`. Looking at
the bodies, only `dropTable` / `createTable` / `alterTable` (all on
`TableCatalog`) are invoked, so the narrow type isn't needed. Picking one form
across all four reads more uniformly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]