mccheah commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r291781039
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
 ##########
 @@ -170,6 +173,85 @@ class DataSourceV2SQLSuite extends QueryTest with 
SharedSQLContext with BeforeAn
     checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), 
spark.table("source"))
   }
 
+  test("ReplaceTableAsSelect: basic v2 implementation using atomic catalog.") {
+    spark.sql("CREATE TABLE testcatatomic.table_name USING foo AS SELECT id, 
data FROM source")
+    val testCatalog = spark.catalog("testcatatomic").asTableCatalog
+    val originalTable = testCatalog.loadTable(Identifier.of(Array(), 
"table_name"))
+    spark.sql("REPLACE TABLE testcatatomic.table_name USING foo AS SELECT id 
FROM source")
+    val replacedTable = testCatalog.loadTable(Identifier.of(Array(), 
"table_name"))
+    assert(replacedTable != originalTable, "Table should have been replaced.")
+    assert(replacedTable.name == "testcatatomic.table_name")
+    assert(replacedTable.partitioning.isEmpty)
+    assert(replacedTable.properties == Map("provider" -> "foo").asJava)
+    assert(replacedTable.schema == new StructType()
+      .add("id", LongType, nullable = false))
+    val rdd = 
spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows)
+    checkAnswer(
+        spark.internalCreateDataFrame(rdd, replacedTable.schema),
+        spark.table("source").select("id"))
+  }
+
+  test("ReplaceTableAsSelect: Non-atomic catalog creates the empty table, but 
leaves the" +
+    " table empty if the write fails.") {
 
 Review comment:
   Hm so I took a closer look at this. It turns out that using 
`Utils.tryWithSafeFinallyAndFailureCallbacks` is risky in all of these code 
paths as it is currently implemented.
   
   That method, when it tries to run the catch block, will first try to set the 
failure reason on the task context via `TaskContext.get().markTaskFailed`. But 
since we're running this try...finally block on the _driver_, there is no such 
task context to get via `TaskContext.get`.
   
   What happens in this case then is that this test passes when it should fail, 
because indeed, the table should be dropped. But the catch block that drops the 
table never gets run, because `TaskContext.get().markTaskFailed` NPEs before 
the catch block can be run.
   
   I think there's a few ways forward:
   1) Don't use the Utils method to do try-catch-finally
   2) Patch the Utils method to check for null on the current task context 
before trying to mark the task failure reason on it.
   
   I'm going with 2) for now, but 1) is very reasonable as well.
   
   Either way, yeah the table should end up being dropped at the end, so this 
test also has to be patched.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to