mccheah commented on a change in pull request #24798: [SPARK-27724][SQL]
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r304711894
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
##########
@@ -196,7 +201,105 @@ private class InMemoryTable(
}
}
-private class BufferedRows extends WriterCommitMessage with InputPartition
with Serializable {
+object TestInMemoryTableCatalog {
+ val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite"
+ val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate"
+
+ def maybeSimulateFailedTableCreation(tableProperties: util.Map[String,
String]): Unit = {
+ if ("true".equalsIgnoreCase(
+
tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY)))
{
+ throw new IllegalStateException("Manual create table failure.")
+ }
+ }
+
+ def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap):
Unit = {
+ if (tableOptions.getBoolean(
+ TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) {
+ throw new IllegalStateException("Manual write to table failure.")
+ }
+ }
+}
+
+class TestStagingInMemoryCatalog
+ extends TestInMemoryTableCatalog with StagingTableCatalog {
+
+ override def stageCreate(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): StagedTable = {
+ newStagedTable(ident, schema, partitions, properties, replaceIfExists =
false)
+ }
+
+ override def stageReplace(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): StagedTable = {
+ newStagedTable(ident, schema, partitions, properties, replaceIfExists =
true)
+ }
+
+ private def newStagedTable(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String],
+ replaceIfExists: Boolean): StagedTable = {
+ import CatalogV2Implicits.IdentifierHelper
+ if (partitions.nonEmpty) {
+ throw new UnsupportedOperationException(
+ s"Catalog $name: Partitioned tables are not supported")
+ }
+
+ TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)
+
+ new TestStagedTable(
+ ident,
+ new InMemoryTable(s"$name.${ident.quoted}", schema, properties),
+ replaceIfExists)
+ }
+
+ private class TestStagedTable(
+ ident: Identifier,
+ delegateTable: InMemoryTable,
+ replaceIfExists: Boolean)
+ extends StagedTable with SupportsWrite with SupportsRead {
+
+ override def commitStagedChanges(): Unit = {
+ if (droppedTables.contains(ident)) {
Review comment:
That doesn't work because the implementation of `stageCreate` doesn't
actually put the table in the `tables` map at all. So you can't necessarily say
the table was dropped just because the table is not in the `tables` map.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]