rdblue commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r291410750
########## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala ########## @@ -170,6 +173,85 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source")) } + test("ReplaceTableAsSelect: basic v2 implementation using atomic catalog.") { + spark.sql("CREATE TABLE testcatatomic.table_name USING foo AS SELECT id, data FROM source") + val testCatalog = spark.catalog("testcatatomic").asTableCatalog + val originalTable = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + spark.sql("REPLACE TABLE testcatatomic.table_name USING foo AS SELECT id FROM source") + val replacedTable = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + assert(replacedTable != originalTable, "Table should have been replaced.") + assert(replacedTable.name == "testcatatomic.table_name") + assert(replacedTable.partitioning.isEmpty) + assert(replacedTable.properties == Map("provider" -> "foo").asJava) + assert(replacedTable.schema == new StructType() + .add("id", LongType, nullable = false)) + val rdd = spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows) + checkAnswer( + spark.internalCreateDataFrame(rdd, replacedTable.schema), + spark.table("source").select("id")) + } + + test("ReplaceTableAsSelect: Non-atomic catalog creates the empty table, but leaves the" + + " table empty if the write fails.") { + spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source") + val testCatalog = spark.catalog("testcat").asTableCatalog + val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty) + intercept[Exception] { + spark.sql("REPLACE TABLE testcat.table_name" + + s" USING foo OPTIONS (`${TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION}`=true)" + + s" AS SELECT id FROM source") + } + val replacedTable = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + assert(replacedTable != table, "Table should have been replaced.") + assert(replacedTable.asInstanceOf[InMemoryTable].rows.isEmpty, + "Rows should not have been successfully written to the replaced table.") + } + + test("ReplaceTableAsSelect: Non-atomic catalog drops the table permanently if the" + + " subsequent table creation fails.") { + spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source") + val testCatalog = spark.catalog("testcat").asTableCatalog + val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty) + intercept[Exception] { + spark.sql("REPLACE TABLE testcat.table_name" + + s" USING foo" + + s" TBLPROPERTIES (`${TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY}`=true)" + + s" AS SELECT id FROM source") + } + assert(!testCatalog.tableExists(Identifier.of(Array(), "table_name")), + "Table should have been dropped and failed to be created.") + } + + test("ReplaceTableAsSelect: Atomic catalog does not drop the table when replace fails.") { + spark.sql("CREATE TABLE testcatatomic.table_name USING foo AS SELECT id, data FROM source") + val testCatalog = spark.catalog("testcatatomic").asTableCatalog + val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + + assert(table.name == "testcatatomic.table_name") + assert(table.partitioning.isEmpty) + assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.schema == new StructType() + .add("id", LongType, nullable = false) + .add("data", StringType)) + intercept[Exception] { + spark.sql("REPLACE TABLE testcatatomic.table_name" + + s" USING foo OPTIONS (`${TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION}=true)" + + s" AS SELECT id FROM source") + } + var maybeReplacedTable: Table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + assert(maybeReplacedTable === table, "Table should not have changed.") + intercept[Exception] { + spark.sql("REPLACE TABLE testcatatomic.table_name" + + s" USING foo" + + s" TBLPROPERTIES (`${TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY}`=true)" + + s" AS SELECT id FROM source") + } + maybeReplacedTable = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + assert(maybeReplacedTable === table, "Table should not have changed.") + } + Review comment: I think this should also have test cases for `REPLACE TABLE` without `AS SELECT ...`. I also don't think there is a test for `REPLACE TABLE` when the original table doesn't exist. It would be good to add that as well. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org