rdblue commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r293158067
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
 ##########
 @@ -154,20 +157,145 @@ class DataSourceV2SQLSuite extends QueryTest with 
SharedSQLContext with BeforeAn
   }
 
   test("CreateTableAsSelect: use v2 plan because catalog is set") {
+    val basicCatalog = spark.catalog("testcat").asTableCatalog
+    val atomicCatalog = spark.catalog("testcat_atomic").asTableCatalog
+    val basicIdentifier = "testcat.table_name"
+    val atomicIdentifier = "testcat_atomic.table_name"
+
+    Seq((basicCatalog, basicIdentifier), (atomicCatalog, 
atomicIdentifier)).foreach {
+      case (catalog, identifier) =>
+        spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM 
source")
+
+        val table = catalog.loadTable(Identifier.of(Array(), "table_name"))
+
+        assert(table.name == identifier)
+        assert(table.partitioning.isEmpty)
+        assert(table.properties == Map("provider" -> "foo").asJava)
+        assert(table.schema == new StructType()
+          .add("id", LongType, nullable = false)
+          .add("data", StringType))
+
+        val rdd = 
spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
+        checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), 
spark.table("source"))
+    }
+  }
+
+  test("ReplaceTableAsSelect: basic v2 implementation.") {
+    val basicCatalog = spark.catalog("testcat").asTableCatalog
+    val atomicCatalog = spark.catalog("testcat_atomic").asTableCatalog
+    val basicIdentifier = "testcat.table_name"
+    val atomicIdentifier = "testcat_atomic.table_name"
+
+    Seq((basicCatalog, basicIdentifier), (atomicCatalog, 
atomicIdentifier)).foreach {
+      case (catalog, identifier) =>
+        spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM 
source")
+        val originalTable = catalog.loadTable(Identifier.of(Array(), 
"table_name"))
+
+        spark.sql(s"REPLACE TABLE $identifier USING foo AS SELECT id FROM 
source")
+        val replacedTable = catalog.loadTable(Identifier.of(Array(), 
"table_name"))
+
+        assert(replacedTable != originalTable, "Table should have been 
replaced.")
+        assert(replacedTable.name == identifier)
+        assert(replacedTable.partitioning.isEmpty)
+        assert(replacedTable.properties == Map("provider" -> "foo").asJava)
+        assert(replacedTable.schema == new StructType()
+          .add("id", LongType, nullable = false))
+
+        val rdd = 
spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows)
+        checkAnswer(
+          spark.internalCreateDataFrame(rdd, replacedTable.schema),
+          spark.table("source").select("id"))
+    }
+  }
+
+  test("ReplaceTableAsSelect: Non-atomic catalog drops the table if the write 
fails.") {
     spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data 
FROM source")
+    val testCatalog = spark.catalog("testcat").asTableCatalog
+    val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+    assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
+
+    intercept[Exception] {
+      spark.sql("REPLACE TABLE testcat.table_name" +
+        s" USING foo OPTIONS 
(`${TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION}`=true)" +
+        s" AS SELECT id FROM source")
+    }
+
+    assert(!testCatalog.tableExists(Identifier.of(Array(), "table_name")),
+        "Table should have been dropped as a result of the replace.")
+  }
 
+  test("ReplaceTableAsSelect: Non-atomic catalog drops the table permanently 
if the" +
+    " subsequent table creation fails.") {
+    spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data 
FROM source")
     val testCatalog = spark.catalog("testcat").asTableCatalog
     val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+    assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
 
-    assert(table.name == "testcat.table_name")
-    assert(table.partitioning.isEmpty)
-    assert(table.properties == Map("provider" -> "foo").asJava)
-    assert(table.schema == new StructType()
-        .add("id", LongType, nullable = false)
-        .add("data", StringType))
+    intercept[Exception] {
+      spark.sql("REPLACE TABLE testcat.table_name" +
+        s" USING foo" +
+        s" TBLPROPERTIES 
(`${TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY}`=true)" +
+        s" AS SELECT id FROM source")
+    }
 
-    val rdd = 
spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
-    checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), 
spark.table("source"))
+    assert(!testCatalog.tableExists(Identifier.of(Array(), "table_name")),
+      "Table should have been dropped and failed to be created.")
+  }
+
+  test("ReplaceTableAsSelect: Atomic catalog does not drop the table when 
replace fails.") {
+    spark.sql("CREATE TABLE testcat_atomic.table_name USING foo AS SELECT id, 
data FROM source")
+    val testCatalog = spark.catalog("testcat_atomic").asTableCatalog
+    val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+
+    intercept[Exception] {
+      spark.sql("REPLACE TABLE testcat_atomic.table_name" +
+        s" USING foo OPTIONS 
(`${TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION}=true)" +
+        s" AS SELECT id FROM source")
+    }
+
+    var maybeReplacedTable = testCatalog.loadTable(Identifier.of(Array(), 
"table_name"))
+    assert(maybeReplacedTable === table, "Table should not have changed.")
+
+    intercept[Exception] {
+      spark.sql("REPLACE TABLE testcat_atomic.table_name" +
+        s" USING foo" +
+        s" TBLPROPERTIES 
(`${TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY}`=true)" +
+        s" AS SELECT id FROM source")
+    }
+
+    maybeReplacedTable = testCatalog.loadTable(Identifier.of(Array(), 
"table_name"))
+    assert(maybeReplacedTable === table, "Table should not have changed.")
+  }
+
+  test("ReplaceTable: Erases the table contents and changes the metadata.") {
+    spark.sql(s"CREATE TABLE table_name USING $orc2 AS SELECT id, data FROM 
source")
+
+    val testCatalog = spark.catalog("testcat").asTableCatalog
+    val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+    assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
+
+    spark.sql("REPLACE TABLE testcat.table_name (id bigint) USING foo")
+    val replaced = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+
+    assert(replaced.asInstanceOf[InMemoryTable].rows.isEmpty,
+        "Replaced table should have no rows after committing.")
+    assert(replaced.schema().fields.length === 1,
+        "Replaced table should have new schema.")
+    assert(replaced.schema().fields(0).name === "id",
+      "Replaced table should have new schema.")
+  }
+
+  test("ReplaceTableAsSelect: New table has same behavior as CTAS.") {
 
 Review comment:
   Why not run this for the atomic catalog as well?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to