mccheah commented on a change in pull request #24798: [SPARK-27724][SQL]
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r291781039
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
##########
@@ -170,6 +173,85 @@ class DataSourceV2SQLSuite extends QueryTest with
SharedSQLContext with BeforeAn
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema),
spark.table("source"))
}
+ test("ReplaceTableAsSelect: basic v2 implementation using atomic catalog.") {
+ spark.sql("CREATE TABLE testcatatomic.table_name USING foo AS SELECT id,
data FROM source")
+ val testCatalog = spark.catalog("testcatatomic").asTableCatalog
+ val originalTable = testCatalog.loadTable(Identifier.of(Array(),
"table_name"))
+ spark.sql("REPLACE TABLE testcatatomic.table_name USING foo AS SELECT id
FROM source")
+ val replacedTable = testCatalog.loadTable(Identifier.of(Array(),
"table_name"))
+ assert(replacedTable != originalTable, "Table should have been replaced.")
+ assert(replacedTable.name == "testcatatomic.table_name")
+ assert(replacedTable.partitioning.isEmpty)
+ assert(replacedTable.properties == Map("provider" -> "foo").asJava)
+ assert(replacedTable.schema == new StructType()
+ .add("id", LongType, nullable = false))
+ val rdd =
spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows)
+ checkAnswer(
+ spark.internalCreateDataFrame(rdd, replacedTable.schema),
+ spark.table("source").select("id"))
+ }
+
+ test("ReplaceTableAsSelect: Non-atomic catalog creates the empty table, but
leaves the" +
+ " table empty if the write fails.") {
Review comment:
Hm so I took a closer look at this. It turns out that using
`Utils.tryWithSafeFinallyAndFailureCallbacks` is risky in all of these code
paths as it is currently implemented.
That method, when it tries to run the catch block, will first try to set the
failure reason on the task context via TaskContext.get().markTaskFailed`. But
since we're running this try...finally block on the _driver_, there is no such
task context to get via `TaskContext.get`.
What happens in this case then is that this test passes when it should fail,
because indeed, the table should be dropped. But the catch block that drops the
table never gets run, because `TaskContext.get().markTaskFailed` NPEs before
the catch block can be run.
I think there's a few ways forward:
1) Don't use the Utils method to do try-catch-finally
2) Patch the Utils method to check for null on the current task context
before trying to mark the task failure reason on it.
I'm going with 2) for now, but 1) is very reasonable as well.
Either way, yeah the table should end up being dropped at the end, so this
test also has to be patched.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]