rdblue commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r291410750
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
 ##########
 @@ -170,6 +173,85 @@ class DataSourceV2SQLSuite extends QueryTest with 
SharedSQLContext with BeforeAn
     checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), 
spark.table("source"))
   }
 
+  test("ReplaceTableAsSelect: basic v2 implementation using atomic catalog.") {
+    spark.sql("CREATE TABLE testcatatomic.table_name USING foo AS SELECT id, 
data FROM source")
+    val testCatalog = spark.catalog("testcatatomic").asTableCatalog
+    val originalTable = testCatalog.loadTable(Identifier.of(Array(), 
"table_name"))
+    spark.sql("REPLACE TABLE testcatatomic.table_name USING foo AS SELECT id 
FROM source")
+    val replacedTable = testCatalog.loadTable(Identifier.of(Array(), 
"table_name"))
+    assert(replacedTable != originalTable, "Table should have been replaced.")
+    assert(replacedTable.name == "testcatatomic.table_name")
+    assert(replacedTable.partitioning.isEmpty)
+    assert(replacedTable.properties == Map("provider" -> "foo").asJava)
+    assert(replacedTable.schema == new StructType()
+      .add("id", LongType, nullable = false))
+    val rdd = 
spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows)
+    checkAnswer(
+        spark.internalCreateDataFrame(rdd, replacedTable.schema),
+        spark.table("source").select("id"))
+  }
+
+  test("ReplaceTableAsSelect: Non-atomic catalog creates the empty table, but 
leaves the" +
+    " table empty if the write fails.") {
+    spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data 
FROM source")
+    val testCatalog = spark.catalog("testcat").asTableCatalog
+    val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+    assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
+    intercept[Exception] {
+      spark.sql("REPLACE TABLE testcat.table_name" +
+        s" USING foo OPTIONS 
(`${TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION}`=true)" +
+        s" AS SELECT id FROM source")
+    }
+    val replacedTable = testCatalog.loadTable(Identifier.of(Array(), 
"table_name"))
+    assert(replacedTable != table, "Table should have been replaced.")
+    assert(replacedTable.asInstanceOf[InMemoryTable].rows.isEmpty,
+        "Rows should not have been successfully written to the replaced 
table.")
+  }
+
+  test("ReplaceTableAsSelect: Non-atomic catalog drops the table permanently 
if the" +
+    " subsequent table creation fails.") {
+    spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data 
FROM source")
+    val testCatalog = spark.catalog("testcat").asTableCatalog
+    val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+    assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
+    intercept[Exception] {
+      spark.sql("REPLACE TABLE testcat.table_name" +
+        s" USING foo" +
+        s" TBLPROPERTIES 
(`${TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY}`=true)" +
+        s" AS SELECT id FROM source")
+    }
+    assert(!testCatalog.tableExists(Identifier.of(Array(), "table_name")),
+      "Table should have been dropped and failed to be created.")
+  }
+
+  test("ReplaceTableAsSelect: Atomic catalog does not drop the table when 
replace fails.") {
+    spark.sql("CREATE TABLE testcatatomic.table_name USING foo AS SELECT id, 
data FROM source")
+    val testCatalog = spark.catalog("testcatatomic").asTableCatalog
+    val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+
+    assert(table.name == "testcatatomic.table_name")
+    assert(table.partitioning.isEmpty)
+    assert(table.properties == Map("provider" -> "foo").asJava)
+    assert(table.schema == new StructType()
+      .add("id", LongType, nullable = false)
+      .add("data", StringType))
+    intercept[Exception] {
+      spark.sql("REPLACE TABLE testcatatomic.table_name" +
+        s" USING foo OPTIONS 
(`${TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION}=true)" +
+        s" AS SELECT id FROM source")
+    }
+    var maybeReplacedTable: Table = 
testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+    assert(maybeReplacedTable === table, "Table should not have changed.")
+    intercept[Exception] {
+      spark.sql("REPLACE TABLE testcatatomic.table_name" +
+        s" USING foo" +
+        s" TBLPROPERTIES 
(`${TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY}`=true)" +
+        s" AS SELECT id FROM source")
+    }
+    maybeReplacedTable = testCatalog.loadTable(Identifier.of(Array(), 
"table_name"))
+    assert(maybeReplacedTable === table, "Table should not have changed.")
+  }
+
 
 Review comment:
   I think this should also have test cases for `REPLACE TABLE` without `AS 
SELECT ...`. I also don't think there is a test for `REPLACE TABLE` when the 
original table doesn't exist. It would be good to add that as well.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to