mccheah commented on a change in pull request #24798: [SPARK-27724][SQL]
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r305102742
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
##########
@@ -196,7 +201,105 @@ private class InMemoryTable(
}
}
-private class BufferedRows extends WriterCommitMessage with InputPartition
with Serializable {
+object TestInMemoryTableCatalog {
+ val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite"
+ val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate"
+
+ def maybeSimulateFailedTableCreation(tableProperties: util.Map[String,
String]): Unit = {
+ if ("true".equalsIgnoreCase(
+
tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY)))
{
+ throw new IllegalStateException("Manual create table failure.")
+ }
+ }
+
+ def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap):
Unit = {
+ if (tableOptions.getBoolean(
+ TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) {
+ throw new IllegalStateException("Manual write to table failure.")
+ }
+ }
+}
+
+class TestStagingInMemoryCatalog
+ extends TestInMemoryTableCatalog with StagingTableCatalog {
+
+ override def stageCreate(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): StagedTable = {
+ newStagedTable(ident, schema, partitions, properties, replaceIfExists =
false)
+ }
+
+ override def stageReplace(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): StagedTable = {
+ newStagedTable(ident, schema, partitions, properties, replaceIfExists =
true)
+ }
+
+ private def newStagedTable(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String],
+ replaceIfExists: Boolean): StagedTable = {
+ import CatalogV2Implicits.IdentifierHelper
+ if (partitions.nonEmpty) {
+ throw new UnsupportedOperationException(
+ s"Catalog $name: Partitioned tables are not supported")
+ }
+
+ TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)
+
+ new TestStagedTable(
+ ident,
+ new InMemoryTable(s"$name.${ident.quoted}", schema, properties),
+ replaceIfExists)
+ }
+
+ private class TestStagedTable(
+ ident: Identifier,
+ delegateTable: InMemoryTable,
+ replaceIfExists: Boolean)
+ extends StagedTable with SupportsWrite with SupportsRead {
+
+ override def commitStagedChanges(): Unit = {
+ if (droppedTables.contains(ident)) {
Review comment:
I thought about this a bit more and chatted with @rdblue and I realized the
confusion is that the staging catalog API doesn't even support passing through
the `orCreate` flag to the catalog. I think we need to pass this information
along to the catalog, otherwise the catalog won't know that the user wanted
`CREATE OR REPLACE` semantics.
I'm more inclined to add an extra method to `StagingTableCatalog` called
`stageCreateOrReplace`, in addition to the other methods we have here already.
Then the behavior of `commitStagedChanges` depends on whether or not the table
was instantiated via `stageCreateOrReplace` vs. `replace` vs. `create`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]