rdblue commented on a change in pull request #25507: [SPARK-28667][SQL] Support InsertInto through the V2SessionCatalog URL: https://github.com/apache/spark/pull/25507#discussion_r319274650
########## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/InsertIntoTests.scala ########## @@ -0,0 +1,467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql._ +import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode} +import org.apache.spark.sql.test.SharedSparkSession + +/** + * A collection of "INSERT INTO" tests that can be run through the SQL or DataFrameWriter APIs. + * Extending test suites can implement the `doInsert` method to run the insert through either + * API. + * + * @param supportsDynamicOverwrite Whether the Table implementations used in the test suite support + * dynamic partition overwrites. If they do, we will check for the + * success of the operations. If not, then we will check that we + * failed with the right error message. + * @param includeSQLOnlyTests Certain INSERT INTO behavior can be achieved purely through SQL, e.g. + * static or dynamic partition overwrites. This flag should be set to + * true if we would like to test these cases. + */ +abstract class InsertIntoTests( + override protected val supportsDynamicOverwrite: Boolean, + override protected val includeSQLOnlyTests: Boolean) extends InsertIntoSQLOnlyTests { + + import testImplicits._ + + /** + * Insert data into a table using the insertInto statement. Implementations can be in SQL + * ("INSERT") or using the DataFrameWriter (`df.write.insertInto`). + */ + protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode = null): Unit + + test("insertInto: append") { + val t1 = s"${catalogAndNamespace}tbl" + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + doInsert(t1, df) + verifyTable(t1, df) + } + + test("insertInto: append by position") { + val t1 = s"${catalogAndNamespace}tbl" + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + val dfr = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("data", "id") + + doInsert(t1, dfr) + verifyTable(t1, df) + } + + test("insertInto: append partitioned table") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)") + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + doInsert(t1, df) + verifyTable(t1, df) + } + } + + test("insertInto: overwrite non-partitioned table") { + val t1 = s"${catalogAndNamespace}tbl" + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + val df2 = Seq((4L, "d"), (5L, "e"), (6L, "f")).toDF("id", "data") + doInsert(t1, df) + doInsert(t1, df2, SaveMode.Overwrite) + verifyTable(t1, df2) + } + + test("insertInto: overwrite partitioned table in static mode") { + withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.STATIC.toString) { + val t1 = s"${catalogAndNamespace}tbl" + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)") + val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data") + doInsert(t1, init) + + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + doInsert(t1, df, SaveMode.Overwrite) + verifyTable(t1, df) + } + } + + + test("insertInto: overwrite partitioned table in static mode by position") { + withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.STATIC.toString) { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)") + val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data") + doInsert(t1, init) + + val dfr = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("data", "id") + doInsert(t1, dfr, SaveMode.Overwrite) + + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + verifyTable(t1, df) + } + } + } + + test("insertInto: fails when missing a column") { + val t1 = s"${catalogAndNamespace}tbl" + sql(s"CREATE TABLE $t1 (id bigint, data string, missing string) USING $v2Format") + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + val exc = intercept[AnalysisException] { + doInsert(t1, df) + } + + verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data", "missing")) + val tableName = if (catalogAndNamespace.isEmpty) s"default.$t1" else t1 + assert(exc.getMessage.contains(s"Cannot write to '$tableName', not enough data columns")) + } + + test("insertInto: fails when an extra column is present") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") + val df = Seq((1L, "a", "mango")).toDF("id", "data", "fruit") + val exc = intercept[AnalysisException] { + doInsert(t1, df) + } + + verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data")) + val tableName = if (catalogAndNamespace.isEmpty) s"default.$t1" else t1 + assert(exc.getMessage.contains(s"Cannot write to '$tableName', too many data columns")) + } + } + + dynamicOverwriteTest("insertInto: overwrite partitioned table in dynamic mode") { Review comment: Patterns like this break IntelliJ's test runner because it can't detect blocks that are tests. That means debugging from within the IDE and running a single test by name is difficult. It may also break running single tests on the command-line too, I'm not sure about that. A better pattern is to use `assume` instead. That will cancel a test if the assumption (in this case, `supportsDynamicOverwrite`) is false. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
