cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL]
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r304388426
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
##########
@@ -89,15 +92,140 @@ case class CreateTableAsSelectExec(
case _ =>
// table does not support writes
- throw new SparkException(s"Table implementation does not support
writes: ${ident.quoted}")
+ throw new SparkException(
+ s"Table implementation does not support writes: ${ident.quoted}")
}
+ })(catchBlock = {
+ catalog.dropTable(ident)
+ })
+ }
+}
+
+/**
+ * Physical plan node for v2 create table as select, when the catalog is
determined to support
+ * staging table creation.
+ *
+ * A new table will be created using the schema of the query, and rows from
the query are appended.
+ * The CTAS operation is atomic. The creation of the table is staged and the
commit of the write
+ * should bundle the commitment of the metadata and the table contents in a
single unit. If the
+ * write fails, the table is instructed to roll back all staged changes.
+ */
+case class AtomicCreateTableAsSelectExec(
+ catalog: StagingTableCatalog,
+ ident: Identifier,
+ partitioning: Seq[Transform],
+ query: SparkPlan,
+ properties: Map[String, String],
+ writeOptions: CaseInsensitiveStringMap,
+ ifNotExists: Boolean) extends AtomicTableWriteExec {
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ if (catalog.tableExists(ident)) {
+ if (ifNotExists) {
+ return sparkContext.parallelize(Seq.empty, 1)
+ }
+
+ throw new TableAlreadyExistsException(ident)
+ }
+ val stagedTable = catalog.stageCreate(
+ ident, query.schema, partitioning.toArray, properties.asJava)
+ writeToStagedTable(stagedTable, writeOptions, ident)
+ }
+}
+
+/**
+ * Physical plan node for v2 replace table as select when the catalog does not
support staging
+ * table replacement.
+ *
+ * A new table will be created using the schema of the query, and rows from
the query are appended.
+ * If the table exists, its contents and schema should be replaced with the
schema and the contents
+ * of the query. This is a non-atomic implementation that drops the table and
then runs non-atomic
Review comment:
IMO the non-atomic version is allowed to have undefined behavior when
failure happens middle way. But it should work as the atomic version if no
failure happens.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]