rdblue commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r291410079
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
 ##########
 @@ -170,6 +173,85 @@ class DataSourceV2SQLSuite extends QueryTest with 
SharedSQLContext with BeforeAn
     checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), 
spark.table("source"))
   }
 
+  test("ReplaceTableAsSelect: basic v2 implementation using atomic catalog.") {
+    spark.sql("CREATE TABLE testcatatomic.table_name USING foo AS SELECT id, 
data FROM source")
+    val testCatalog = spark.catalog("testcatatomic").asTableCatalog
+    val originalTable = testCatalog.loadTable(Identifier.of(Array(), 
"table_name"))
+    spark.sql("REPLACE TABLE testcatatomic.table_name USING foo AS SELECT id 
FROM source")
+    val replacedTable = testCatalog.loadTable(Identifier.of(Array(), 
"table_name"))
+    assert(replacedTable != originalTable, "Table should have been replaced.")
+    assert(replacedTable.name == "testcatatomic.table_name")
+    assert(replacedTable.partitioning.isEmpty)
+    assert(replacedTable.properties == Map("provider" -> "foo").asJava)
+    assert(replacedTable.schema == new StructType()
+      .add("id", LongType, nullable = false))
+    val rdd = 
spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows)
+    checkAnswer(
+        spark.internalCreateDataFrame(rdd, replacedTable.schema),
+        spark.table("source").select("id"))
+  }
+
+  test("ReplaceTableAsSelect: Non-atomic catalog creates the empty table, but 
leaves the" +
+    " table empty if the write fails.") {
+    spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data 
FROM source")
+    val testCatalog = spark.catalog("testcat").asTableCatalog
+    val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+    assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
+    intercept[Exception] {
+      spark.sql("REPLACE TABLE testcat.table_name" +
+        s" USING foo OPTIONS 
(`${TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION}`=true)" +
+        s" AS SELECT id FROM source")
+    }
+    val replacedTable = testCatalog.loadTable(Identifier.of(Array(), 
"table_name"))
+    assert(replacedTable != table, "Table should have been replaced.")
 
 Review comment:
   I think a better test assertion is that the schema matches the new table. 
This test could be true for the same underlying metadata if two separate 
instances of a table are loaded from a catalog.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to