rdblue commented on a change in pull request #24798: [SPARK-27724][SQL]
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r293158067
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
##########
@@ -154,20 +157,145 @@ class DataSourceV2SQLSuite extends QueryTest with
SharedSQLContext with BeforeAn
}
test("CreateTableAsSelect: use v2 plan because catalog is set") {
+ val basicCatalog = spark.catalog("testcat").asTableCatalog
+ val atomicCatalog = spark.catalog("testcat_atomic").asTableCatalog
+ val basicIdentifier = "testcat.table_name"
+ val atomicIdentifier = "testcat_atomic.table_name"
+
+ Seq((basicCatalog, basicIdentifier), (atomicCatalog,
atomicIdentifier)).foreach {
+ case (catalog, identifier) =>
+ spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM
source")
+
+ val table = catalog.loadTable(Identifier.of(Array(), "table_name"))
+
+ assert(table.name == identifier)
+ assert(table.partitioning.isEmpty)
+ assert(table.properties == Map("provider" -> "foo").asJava)
+ assert(table.schema == new StructType()
+ .add("id", LongType, nullable = false)
+ .add("data", StringType))
+
+ val rdd =
spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
+ checkAnswer(spark.internalCreateDataFrame(rdd, table.schema),
spark.table("source"))
+ }
+ }
+
+ test("ReplaceTableAsSelect: basic v2 implementation.") {
+ val basicCatalog = spark.catalog("testcat").asTableCatalog
+ val atomicCatalog = spark.catalog("testcat_atomic").asTableCatalog
+ val basicIdentifier = "testcat.table_name"
+ val atomicIdentifier = "testcat_atomic.table_name"
+
+ Seq((basicCatalog, basicIdentifier), (atomicCatalog,
atomicIdentifier)).foreach {
+ case (catalog, identifier) =>
+ spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM
source")
+ val originalTable = catalog.loadTable(Identifier.of(Array(),
"table_name"))
+
+ spark.sql(s"REPLACE TABLE $identifier USING foo AS SELECT id FROM
source")
+ val replacedTable = catalog.loadTable(Identifier.of(Array(),
"table_name"))
+
+ assert(replacedTable != originalTable, "Table should have been
replaced.")
+ assert(replacedTable.name == identifier)
+ assert(replacedTable.partitioning.isEmpty)
+ assert(replacedTable.properties == Map("provider" -> "foo").asJava)
+ assert(replacedTable.schema == new StructType()
+ .add("id", LongType, nullable = false))
+
+ val rdd =
spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows)
+ checkAnswer(
+ spark.internalCreateDataFrame(rdd, replacedTable.schema),
+ spark.table("source").select("id"))
+ }
+ }
+
+ test("ReplaceTableAsSelect: Non-atomic catalog drops the table if the write
fails.") {
spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data
FROM source")
+ val testCatalog = spark.catalog("testcat").asTableCatalog
+ val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+ assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
+
+ intercept[Exception] {
+ spark.sql("REPLACE TABLE testcat.table_name" +
+ s" USING foo OPTIONS
(`${TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION}`=true)" +
+ s" AS SELECT id FROM source")
+ }
+
+ assert(!testCatalog.tableExists(Identifier.of(Array(), "table_name")),
+ "Table should have been dropped as a result of the replace.")
+ }
+ test("ReplaceTableAsSelect: Non-atomic catalog drops the table permanently
if the" +
+ " subsequent table creation fails.") {
+ spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data
FROM source")
val testCatalog = spark.catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+ assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
- assert(table.name == "testcat.table_name")
- assert(table.partitioning.isEmpty)
- assert(table.properties == Map("provider" -> "foo").asJava)
- assert(table.schema == new StructType()
- .add("id", LongType, nullable = false)
- .add("data", StringType))
+ intercept[Exception] {
+ spark.sql("REPLACE TABLE testcat.table_name" +
+ s" USING foo" +
+ s" TBLPROPERTIES
(`${TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY}`=true)" +
+ s" AS SELECT id FROM source")
+ }
- val rdd =
spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
- checkAnswer(spark.internalCreateDataFrame(rdd, table.schema),
spark.table("source"))
+ assert(!testCatalog.tableExists(Identifier.of(Array(), "table_name")),
+ "Table should have been dropped and failed to be created.")
+ }
+
+ test("ReplaceTableAsSelect: Atomic catalog does not drop the table when
replace fails.") {
+ spark.sql("CREATE TABLE testcat_atomic.table_name USING foo AS SELECT id,
data FROM source")
+ val testCatalog = spark.catalog("testcat_atomic").asTableCatalog
+ val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+
+ intercept[Exception] {
+ spark.sql("REPLACE TABLE testcat_atomic.table_name" +
+ s" USING foo OPTIONS
(`${TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION}=true)" +
+ s" AS SELECT id FROM source")
+ }
+
+ var maybeReplacedTable = testCatalog.loadTable(Identifier.of(Array(),
"table_name"))
+ assert(maybeReplacedTable === table, "Table should not have changed.")
+
+ intercept[Exception] {
+ spark.sql("REPLACE TABLE testcat_atomic.table_name" +
+ s" USING foo" +
+ s" TBLPROPERTIES
(`${TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY}`=true)" +
+ s" AS SELECT id FROM source")
+ }
+
+ maybeReplacedTable = testCatalog.loadTable(Identifier.of(Array(),
"table_name"))
+ assert(maybeReplacedTable === table, "Table should not have changed.")
+ }
+
+ test("ReplaceTable: Erases the table contents and changes the metadata.") {
+ spark.sql(s"CREATE TABLE table_name USING $orc2 AS SELECT id, data FROM
source")
+
+ val testCatalog = spark.catalog("testcat").asTableCatalog
+ val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+ assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
+
+ spark.sql("REPLACE TABLE testcat.table_name (id bigint) USING foo")
+ val replaced = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+
+ assert(replaced.asInstanceOf[InMemoryTable].rows.isEmpty,
+ "Replaced table should have no rows after committing.")
+ assert(replaced.schema().fields.length === 1,
+ "Replaced table should have new schema.")
+ assert(replaced.schema().fields(0).name === "id",
+ "Replaced table should have new schema.")
+ }
+
+ test("ReplaceTableAsSelect: New table has same behavior as CTAS.") {
Review comment:
Why not run this for the atomic catalog as well?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]