mccheah commented on a change in pull request #24798: [SPARK-27724][WIP] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r290535041
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 ##########
 @@ -76,25 +79,94 @@ case class CreateTableAsSelectExec(
 
       throw new TableAlreadyExistsException(ident)
     }
+    catalog match {
+      case txnCatalog: TransactionalTableCatalog =>
+        val stagedTable = txnCatalog.stageCreate(
+            ident, query.schema, partitioning.toArray, properties.asJava)
+        writeToStagedTable(stagedTable, writeOptions, ident)
+      case _ =>
+        Utils.tryWithSafeFinallyAndFailureCallbacks({
+          catalog.createTable(
+              ident, query.schema, partitioning.toArray, properties.asJava) 
match {
+            case table: SupportsWrite =>
+              val batchWrite = table.newWriteBuilder(writeOptions)
+                .withInputDataSchema(query.schema)
+                .withQueryId(UUID.randomUUID().toString)
+                .buildForBatch()
+
+              doWrite(batchWrite)
+
+            case _ =>
+              // table does not support writes
+              throw new SparkException(
+                s"Table implementation does not support writes: 
${ident.quoted}")
+          }
+        })(catchBlock = {
+          catalog.dropTable(ident)
+        })
+    }
+  }
+}
 
-    Utils.tryWithSafeFinallyAndFailureCallbacks({
-      catalog.createTable(ident, query.schema, partitioning.toArray, 
properties.asJava) match {
-        case table: SupportsWrite =>
-          val batchWrite = table.newWriteBuilder(writeOptions)
-            .withInputDataSchema(query.schema)
-            .withQueryId(UUID.randomUUID().toString)
-            .buildForBatch()
+/**
+ * Physical plan node for v2 replace table as select.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * If the table exists, its contents and schema should be replaced with the 
schema and the contents
+ * of the query. If the catalog supports atomic replacement of tables, the 
plugin's transactional
+ * implementation is used so that the plugin can avoid losing previously 
committed data if any
+ * part of the write fails. The non-atomic implementation will drop the table 
and then run
+ * non-atomic CTAS.
+ */
+case class ReplaceTableAsSelectExec(
+    catalog: TableCatalog,
+    ident: Identifier,
+    partitioning: Seq[Transform],
+    query: SparkPlan,
+    properties: Map[String, String],
+    writeOptions: CaseInsensitiveStringMap) extends StagedTableWriteExec {
 
-          doWrite(batchWrite)
+  import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper
 
-        case _ =>
-          // table does not support writes
-          throw new SparkException(s"Table implementation does not support 
writes: ${ident.quoted}")
-      }
+  override protected def doExecute(): RDD[InternalRow] = {
+    catalog match {
+      case txnCatalog: TransactionalTableCatalog =>
+        val stagedTable = txnCatalog.stageReplace(
+          ident, query.schema, partitioning.toArray, properties.asJava)
+        writeToStagedTable(stagedTable, writeOptions, ident)
+      case _ =>
+        // Note that this operation is potentially unsafe, but these are the 
strict semantics of
 
 Review comment:
   I think we talked about this, and we concluded that this is the appropriate 
behavior - but I'm still not sure it is wise to support an inherently unsafe 
and potentially inconsistent operation. It's worth considering if we should 
throw `UnsupportedOperationException` here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to