rdblue commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r305479389
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 ##########
 @@ -89,15 +92,145 @@ case class CreateTableAsSelectExec(
 
         case _ =>
           // table does not support writes
-          throw new SparkException(s"Table implementation does not support 
writes: ${ident.quoted}")
+          throw new SparkException(
+            s"Table implementation does not support writes: ${ident.quoted}")
       }
+    })(catchBlock = {
+      catalog.dropTable(ident)
+    })
+  }
+}
+
+/**
+ * Physical plan node for v2 create table as select, when the catalog is 
determined to support
+ * staging table creation.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * The CTAS operation is atomic. The creation of the table is staged and the 
commit of the write
+ * should bundle the commitment of the metadata and the table contents in a 
single unit. If the
+ * write fails, the table is instructed to roll back all staged changes.
+ */
+case class AtomicCreateTableAsSelectExec(
+    catalog: StagingTableCatalog,
+    ident: Identifier,
+    partitioning: Seq[Transform],
+    query: SparkPlan,
+    properties: Map[String, String],
+    writeOptions: CaseInsensitiveStringMap,
+    ifNotExists: Boolean) extends AtomicTableWriteExec {
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    if (catalog.tableExists(ident)) {
+      if (ifNotExists) {
+        return sparkContext.parallelize(Seq.empty, 1)
+      }
+
+      throw new TableAlreadyExistsException(ident)
+    }
+    val stagedTable = catalog.stageCreate(
+      ident, query.schema, partitioning.toArray, properties.asJava)
+    writeToStagedTable(stagedTable, writeOptions, ident)
+  }
+}
+
+/**
+ * Physical plan node for v2 replace table as select when the catalog does not 
support staging
+ * table replacement.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * If the table exists, its contents and schema should be replaced with the 
schema and the contents
+ * of the query. This is a non-atomic implementation that drops the table and 
then runs non-atomic
+ * CTAS. For an atomic implementation for catalogs with the appropriate 
support, see
+ * ReplaceTableAsSelectStagingExec.
+ */
+case class ReplaceTableAsSelectExec(
+    catalog: TableCatalog,
+    ident: Identifier,
+    partitioning: Seq[Transform],
+    query: SparkPlan,
+    properties: Map[String, String],
+    writeOptions: CaseInsensitiveStringMap,
+    orCreate: Boolean) extends AtomicTableWriteExec {
+
+  import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    // Note that this operation is potentially unsafe, but these are the 
strict semantics of
+    // RTAS if the catalog does not support atomic operations.
+    //
+    // There are numerous cases we concede to where the table will be dropped 
and irrecoverable:
+    //
+    // 1. Creating the new table fails,
+    // 2. Writing to the new table fails,
+    // 3. The table returned by catalog.createTable doesn't support writing.
+    if (catalog.tableExists(ident)) {
+      catalog.dropTable(ident)
+    } else if (!orCreate) {
+      throw new CannotReplaceMissingTableException(ident)
+    }
+    val createdTable = catalog.createTable(
+      ident, query.schema, partitioning.toArray, properties.asJava)
+    Utils.tryWithSafeFinallyAndFailureCallbacks({
+      createdTable match {
+        case table: SupportsWrite =>
+          val batchWrite = table.newWriteBuilder(writeOptions)
+            .withInputDataSchema(query.schema)
+            .withQueryId(UUID.randomUUID().toString)
+            .buildForBatch()
+
+          doWrite(batchWrite)
 
+        case _ =>
+          // table does not support writes
+          throw new SparkException(
+            s"Table implementation does not support writes: ${ident.quoted}")
+      }
     })(catchBlock = {
       catalog.dropTable(ident)
     })
   }
 }
 
+/**
+ *
+ * Physical plan node for v2 replace table as select when the catalog supports 
staging
+ * table replacement.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * If the table exists, its contents and schema should be replaced with the 
schema and the contents
+ * of the query. This implementation is atomic. The table replacement is 
staged, and the commit
+ * operation at the end should perform tne replacement of the table's metadata 
and contents. If the
+ * write fails, the table is instructed to roll back staged changes and any 
previously written table
+ * is left untouched.
+ */
+case class AtomicReplaceTableAsSelectExec(
+    catalog: StagingTableCatalog,
+    ident: Identifier,
+    partitioning: Seq[Transform],
+    query: SparkPlan,
+    properties: Map[String, String],
+    writeOptions: CaseInsensitiveStringMap,
+    orCreate: Boolean) extends AtomicTableWriteExec {
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val stagedTable = if (catalog.tableExists(ident)) {
 
 Review comment:
   I disagree that there is no need to check whether the table exists. We had a 
similar discussion on CREATE TABLE. Spark should check existence to ensure that 
the error is consistently thrown. If the table does not exist and `orCreate` is 
false, then Spark should thrown an exception and not rely on the source to do 
it.
   
   That said, I think it would be simpler to update the logic a little:
   
   ```scala
         if (orCreate) {
           catalog.stageCreateOrReplace(
             ident, query.schema, partitioning.toArray, properties.asJava)
         } else if (catalog.tableExists(ident) {
           catalog.stageReplace(
             ident, query.schema, partitioning.toArray, properties.asJava)
         } else {
           throw new CannotReplaceMissingTableException(ident)
         }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to