longvu-db commented on code in PR #55463: URL: https://github.com/apache/spark/pull/55463#discussion_r3318307456
########## sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2IncrementallyConstructedQueryTests.scala: ########## @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.{Column, InMemoryTableCatalog, NullTableIdAndNullColumnIdInMemoryTableCatalog, TableCatalog, TableChange, TableInfo} +import org.apache.spark.sql.types.{IntegerType, StringType} +import org.apache.spark.unsafe.types.UTF8String + +/** + * Tests for incrementally constructed queries where df1 and df2 are analyzed at different + * times, then joined. The refresh phase in QueryExecution must align table versions across + * all references. + * + * Classic and Connect modes produce different results in some scenarios because in Connect + * mode, resolution is deferred until execution, so both sides of a join always see the + * latest table state. + * + * NOTE: All `session.sql(...)` calls append `.collect()` because Connect client DataFrames + * are lazy and require an action to trigger execution. In classic mode `.collect()` on + * eager statements (DDL, INSERT) is a no-op, so this is harmless. + */ +trait DSv2IncrementallyConstructedQueryTests extends DSv2ExternalMutationTestBase { + + // --------------------------------------------------------------------------- + // Scenario 1: join after insert refreshes both sides to latest version. + // Both classic and Connect see the inserted data. + // --------------------------------------------------------------------------- + + test(s"${testPrefix}SPARK-54157: join refreshes both sides after external insert" + + " (table with both table and column ID support)") { + withTestSession { session => + withTestTableAndViews(session, testTable) { + session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect() + + val df1 = session.table(testTable) + + val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat") + externalAppend(catalog = catalog, ident = testIdent, row = InternalRow(2, 200)) + + val df2 = session.table(testTable) + + checkRows( + df1.join(df2, df1("id") === df2("id")), + Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200))) + } + } + } + + test(s"${testPrefix}SPARK-54157: join refreshes both sides after same-session insert" + + " (table with both table and column ID support)") { + withTestSession { session => + withTestTableAndViews(session, testTable) { + session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect() + + val df1 = session.table(testTable) + + session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect() + + val df2 = session.table(testTable) + + checkRows( + df1.join(df2, df1("id") === df2("id")), + Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200))) + } + } + } + + // --------------------------------------------------------------------------- + // Scenario 2: join after ADD COLUMN. + // Classic: df1 keeps its original 2-column schema. + // Connect: re-resolves df1 with the new 3-column schema. + // --------------------------------------------------------------------------- + + test(s"${testPrefix}SPARK-54157: join after external ADD COLUMN" + + " (table with both table and column ID support)") { + withTestSession { session => + withTestTableAndViews(session, testTable) { + session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect() + + val df1 = session.table(testTable) + + val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat") + catalog.alterTable( + testIdent, TableChange.addColumn(Array("new_column"), IntegerType, true)) + externalAppend( + catalog = catalog, ident = testIdent, row = InternalRow(2, 200, -1)) + + val df2 = session.table(testTable) + val selfJoin = df1.join(df2, df1("id") === df2("id")) + + if (isConnect) { + // Connect re-resolves df1 with the new 3-column schema (id, salary, new_column). + assert(selfJoin.columns.length == 6, + s"Expected 6 columns (3 + 3) but got: ${selfJoin.columns.mkString(", ")}") + checkRows(selfJoin, + Seq(Row(1, 100, null, 1, 100, null), Row(2, 200, -1, 2, 200, -1))) + } else { + // Classic: df1 keeps its original 2-column schema (id, salary). + assert(selfJoin.columns.length == 5, + s"Expected 5 columns (2 + 3) but got: ${selfJoin.columns.mkString(", ")}") + checkRows(selfJoin, + Seq(Row(1, 100, 1, 100, null), Row(2, 200, 2, 200, -1))) + } + } + } + } + + test(s"${testPrefix}SPARK-54157: join after same-session ADD COLUMN" + + " (table with both table and column ID support)") { + withTestSession { session => + withTestTableAndViews(session, testTable) { + session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect() + + val df1 = session.table(testTable) + + session.sql(s"ALTER TABLE $testTable ADD COLUMN new_column INT").collect() + session.sql(s"INSERT INTO $testTable VALUES (2, 200, -1)").collect() + + val df2 = session.table(testTable) + val selfJoin = df1.join(df2, df1("id") === df2("id")) + + if (isConnect) { + // Connect re-resolves df1 with the new 3-column schema (id, salary, new_column). + assert(selfJoin.columns.length == 6, + s"Expected 6 columns (3 + 3) but got: ${selfJoin.columns.mkString(", ")}") + checkRows(selfJoin, + Seq(Row(1, 100, null, 1, 100, null), Row(2, 200, -1, 2, 200, -1))) + } else { + // Classic: df1 keeps its original 2-column schema (id, salary). + assert(selfJoin.columns.length == 5, + s"Expected 5 columns (2 + 3) but got: ${selfJoin.columns.mkString(", ")}") + checkRows(selfJoin, + Seq(Row(1, 100, 1, 100, null), Row(2, 200, 2, 200, -1))) + } + } + } + } + + // --------------------------------------------------------------------------- + // Scenario 3: join after DROP COLUMN. + // Classic: df1 references the dropped column, fails with COLUMNS_MISMATCH. + // Connect: re-resolves df1 without the dropped column, join succeeds. + // --------------------------------------------------------------------------- + + test(s"${testPrefix}SPARK-54157: join after external DROP COLUMN" + + " (table with both table and column ID support)") { + withTestSession { session => + withTestTableAndViews(session, testTable) { + session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect() + + val df1 = session.table(testTable) + + val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat") + catalog.alterTable( + testIdent, TableChange.deleteColumn(Array("salary"), false)) + externalAppend(catalog = catalog, ident = testIdent, row = InternalRow(2)) + + val df2 = session.table(testTable) + + if (isConnect) { + // Connect re-resolves df1 without the dropped column. + checkRows( + df1.join(df2, df1("id") === df2("id")), + Seq(Row(1, 1), Row(2, 2))) + } else { + // Classic: df1 references the dropped column. + checkError( + exception = intercept[AnalysisException] { + df1.join(df2, df1("id") === df2("id")).collect() + }, + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", + matchPVals = true, + parameters = Map("tableName" -> ".*", "errors" -> ".*")) Review Comment: Done. Aligned all five to `"errors" -> "(?s).*"`. ########## sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2IncrementallyConstructedQueryTests.scala: ########## @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.{Column, InMemoryTableCatalog, NullTableIdAndNullColumnIdInMemoryTableCatalog, TableCatalog, TableChange, TableInfo} +import org.apache.spark.sql.types.{IntegerType, StringType} +import org.apache.spark.unsafe.types.UTF8String + +/** + * Tests for incrementally constructed queries where df1 and df2 are analyzed at different + * times, then joined. The refresh phase in QueryExecution must align table versions across + * all references. + * + * Classic and Connect modes produce different results in some scenarios because in Connect + * mode, resolution is deferred until execution, so both sides of a join always see the + * latest table state. + * + * NOTE: All `session.sql(...)` calls append `.collect()` because Connect client DataFrames + * are lazy and require an action to trigger execution. In classic mode `.collect()` on + * eager statements (DDL, INSERT) is a no-op, so this is harmless. + */ +trait DSv2IncrementallyConstructedQueryTests extends DSv2ExternalMutationTestBase { + + // --------------------------------------------------------------------------- + // Scenario 1: join after insert refreshes both sides to latest version. + // Both classic and Connect see the inserted data. + // --------------------------------------------------------------------------- + + test(s"${testPrefix}SPARK-54157: join refreshes both sides after external insert" + + " (table with both table and column ID support)") { + withTestSession { session => + withTestTableAndViews(session, testTable) { + session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect() + + val df1 = session.table(testTable) + + val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat") + externalAppend(catalog = catalog, ident = testIdent, row = InternalRow(2, 200)) + + val df2 = session.table(testTable) + + checkRows( + df1.join(df2, df1("id") === df2("id")), + Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200))) + } + } + } + + test(s"${testPrefix}SPARK-54157: join refreshes both sides after same-session insert" + + " (table with both table and column ID support)") { + withTestSession { session => + withTestTableAndViews(session, testTable) { + session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect() + + val df1 = session.table(testTable) + + session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect() + + val df2 = session.table(testTable) + + checkRows( + df1.join(df2, df1("id") === df2("id")), + Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200))) + } + } + } + + // --------------------------------------------------------------------------- + // Scenario 2: join after ADD COLUMN. + // Classic: df1 keeps its original 2-column schema. + // Connect: re-resolves df1 with the new 3-column schema. + // --------------------------------------------------------------------------- + + test(s"${testPrefix}SPARK-54157: join after external ADD COLUMN" + + " (table with both table and column ID support)") { + withTestSession { session => + withTestTableAndViews(session, testTable) { + session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect() + + val df1 = session.table(testTable) + + val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat") + catalog.alterTable( + testIdent, TableChange.addColumn(Array("new_column"), IntegerType, true)) + externalAppend( + catalog = catalog, ident = testIdent, row = InternalRow(2, 200, -1)) + + val df2 = session.table(testTable) + val selfJoin = df1.join(df2, df1("id") === df2("id")) + + if (isConnect) { + // Connect re-resolves df1 with the new 3-column schema (id, salary, new_column). + assert(selfJoin.columns.length == 6, + s"Expected 6 columns (3 + 3) but got: ${selfJoin.columns.mkString(", ")}") + checkRows(selfJoin, + Seq(Row(1, 100, null, 1, 100, null), Row(2, 200, -1, 2, 200, -1))) + } else { + // Classic: df1 keeps its original 2-column schema (id, salary). + assert(selfJoin.columns.length == 5, + s"Expected 5 columns (2 + 3) but got: ${selfJoin.columns.mkString(", ")}") + checkRows(selfJoin, + Seq(Row(1, 100, 1, 100, null), Row(2, 200, 2, 200, -1))) + } + } + } + } + + test(s"${testPrefix}SPARK-54157: join after same-session ADD COLUMN" + + " (table with both table and column ID support)") { + withTestSession { session => + withTestTableAndViews(session, testTable) { + session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect() + + val df1 = session.table(testTable) + + session.sql(s"ALTER TABLE $testTable ADD COLUMN new_column INT").collect() + session.sql(s"INSERT INTO $testTable VALUES (2, 200, -1)").collect() + + val df2 = session.table(testTable) + val selfJoin = df1.join(df2, df1("id") === df2("id")) + + if (isConnect) { + // Connect re-resolves df1 with the new 3-column schema (id, salary, new_column). + assert(selfJoin.columns.length == 6, + s"Expected 6 columns (3 + 3) but got: ${selfJoin.columns.mkString(", ")}") + checkRows(selfJoin, + Seq(Row(1, 100, null, 1, 100, null), Row(2, 200, -1, 2, 200, -1))) + } else { + // Classic: df1 keeps its original 2-column schema (id, salary). + assert(selfJoin.columns.length == 5, + s"Expected 5 columns (2 + 3) but got: ${selfJoin.columns.mkString(", ")}") + checkRows(selfJoin, + Seq(Row(1, 100, 1, 100, null), Row(2, 200, 2, 200, -1))) + } + } + } + } + + // --------------------------------------------------------------------------- + // Scenario 3: join after DROP COLUMN. + // Classic: df1 references the dropped column, fails with COLUMNS_MISMATCH. + // Connect: re-resolves df1 without the dropped column, join succeeds. + // --------------------------------------------------------------------------- + + test(s"${testPrefix}SPARK-54157: join after external DROP COLUMN" + + " (table with both table and column ID support)") { + withTestSession { session => + withTestTableAndViews(session, testTable) { + session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect() + + val df1 = session.table(testTable) + + val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat") + catalog.alterTable( + testIdent, TableChange.deleteColumn(Array("salary"), false)) + externalAppend(catalog = catalog, ident = testIdent, row = InternalRow(2)) + + val df2 = session.table(testTable) + + if (isConnect) { + // Connect re-resolves df1 without the dropped column. + checkRows( + df1.join(df2, df1("id") === df2("id")), + Seq(Row(1, 1), Row(2, 2))) + } else { + // Classic: df1 references the dropped column. + checkError( + exception = intercept[AnalysisException] { + df1.join(df2, df1("id") === df2("id")).collect() + }, + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", + matchPVals = true, + parameters = Map("tableName" -> ".*", "errors" -> ".*")) + } + } + } + } + + test(s"${testPrefix}SPARK-54157: join after same-session DROP COLUMN" + + " (table with both table and column ID support)") { + withTestSession { session => + withTestTableAndViews(session, testTable) { + session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect() + + val df1 = session.table(testTable) + + session.sql(s"ALTER TABLE $testTable DROP COLUMN salary").collect() + session.sql(s"INSERT INTO $testTable VALUES (2)").collect() + + val df2 = session.table(testTable) + + if (isConnect) { + // Connect re-resolves df1 without the dropped column. + checkRows( + df1.join(df2, df1("id") === df2("id")), + Seq(Row(1, 1), Row(2, 2))) + } else { + // Classic: df1 references the dropped column. + checkError( + exception = intercept[AnalysisException] { + df1.join(df2, df1("id") === df2("id")).collect() + }, + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", + matchPVals = true, + parameters = Map("tableName" -> ".*", "errors" -> ".*")) + } + } + } + } + + // --------------------------------------------------------------------------- + // Scenario 4: external drop and recreate table. + // 4a: table ID detects it, TABLE_ID_MISMATCH in classic, succeeds in Connect + // 4b: column IDs detect it, COLUMN_ID_MISMATCH in classic, succeeds in Connect + // 4c: no IDs, goes undetected, join succeeds (both modes) + // --------------------------------------------------------------------------- + + test(s"${testPrefix}SPARK-54157: join after external table drop and recreate" + + " (table with both table and column ID support)") { + withTestSession { session => + withTestTableAndViews(session, testTable) { + session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect() + + val df1 = session.table(testTable) + val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val originTableId = catalog.loadTable(testIdent).id + + catalog.dropTable(testIdent) + catalog.createTable( + testIdent, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + externalAppend(catalog = catalog, ident = testIdent, row = InternalRow(2, 200)) + + val df2 = session.table(testTable) + val newTableId = catalog.loadTable(testIdent).id + assert(originTableId != newTableId) + + if (isConnect) { + // Connect re-resolves both sides to the recreated table. + checkRows( + df1.join(df2, df1("id") === df2("id")), + Seq(Row(2, 200, 2, 200))) + } else { + // Classic: table ID changed. + checkError( + exception = intercept[AnalysisException] { + df1.join(df2, df1("id") === df2("id")).collect() + }, + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.TABLE_ID_MISMATCH", + matchPVals = true, + parameters = Map( + "tableName" -> ".*", + "capturedTableId" -> ".*", + "currentTableId" -> ".*")) + } + } + } + } + + test(s"${testPrefix}SPARK-54157: join after external drop/recreate" + + " (table without table ID support, but with column ID support)") { + val nullIdT = "nullidcat.ns1.ns2.tbl" + withTestSession { session => + withTestTableAndViews(session, nullIdT) { + session.sql(s"CREATE TABLE $nullIdT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $nullIdT VALUES (1, 100)").collect() + + val df1 = session.table(nullIdT) + val catalog = getTableCatalog[TableCatalog](session, "nullidcat") Review Comment: Done. Switched scenarios 4c and 5b to `getTableCatalog[TableCatalog]` for uniformity — only `TableCatalog` methods are used. ########## sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala: ########## @@ -51,6 +51,9 @@ trait DSv2ExternalMutationTestBase extends QueryTest { /** Prefix for test names, e.g. "" or "[connect] ". */ protected def testPrefix: String + /** Whether this suite runs under Spark Connect. */ + protected def isConnect: Boolean = testPrefix == "[connect] " Review Comment: Done. Made `isConnect` an abstract def on `DSv2ExternalMutationTestBase`, set explicitly to `false` in `DataSourceV2DataFrameSuite` and `true` in `DataSourceV2DataFrameConnectSuite`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
