mccheah commented on a change in pull request #24798: [SPARK-27724][WIP]
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r290535041
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
##########
@@ -76,25 +79,94 @@ case class CreateTableAsSelectExec(
throw new TableAlreadyExistsException(ident)
}
+ catalog match {
+ case txnCatalog: TransactionalTableCatalog =>
+ val stagedTable = txnCatalog.stageCreate(
+ ident, query.schema, partitioning.toArray, properties.asJava)
+ writeToStagedTable(stagedTable, writeOptions, ident)
+ case _ =>
+ Utils.tryWithSafeFinallyAndFailureCallbacks({
+ catalog.createTable(
+ ident, query.schema, partitioning.toArray, properties.asJava)
match {
+ case table: SupportsWrite =>
+ val batchWrite = table.newWriteBuilder(writeOptions)
+ .withInputDataSchema(query.schema)
+ .withQueryId(UUID.randomUUID().toString)
+ .buildForBatch()
+
+ doWrite(batchWrite)
+
+ case _ =>
+ // table does not support writes
+ throw new SparkException(
+ s"Table implementation does not support writes:
${ident.quoted}")
+ }
+ })(catchBlock = {
+ catalog.dropTable(ident)
+ })
+ }
+ }
+}
- Utils.tryWithSafeFinallyAndFailureCallbacks({
- catalog.createTable(ident, query.schema, partitioning.toArray,
properties.asJava) match {
- case table: SupportsWrite =>
- val batchWrite = table.newWriteBuilder(writeOptions)
- .withInputDataSchema(query.schema)
- .withQueryId(UUID.randomUUID().toString)
- .buildForBatch()
+/**
+ * Physical plan node for v2 replace table as select.
+ *
+ * A new table will be created using the schema of the query, and rows from
the query are appended.
+ * If the table exists, its contents and schema should be replaced with the
schema and the contents
+ * of the query. If the catalog supports atomic replacement of tables, the
plugin's transactional
+ * implementation is used so that the plugin can avoid losing previously
committed data if any
+ * part of the write fails. The non-atomic implementation will drop the table
and then run
+ * non-atomic CTAS.
+ */
+case class ReplaceTableAsSelectExec(
+ catalog: TableCatalog,
+ ident: Identifier,
+ partitioning: Seq[Transform],
+ query: SparkPlan,
+ properties: Map[String, String],
+ writeOptions: CaseInsensitiveStringMap) extends StagedTableWriteExec {
- doWrite(batchWrite)
+ import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper
- case _ =>
- // table does not support writes
- throw new SparkException(s"Table implementation does not support
writes: ${ident.quoted}")
- }
+ override protected def doExecute(): RDD[InternalRow] = {
+ catalog match {
+ case txnCatalog: TransactionalTableCatalog =>
+ val stagedTable = txnCatalog.stageReplace(
+ ident, query.schema, partitioning.toArray, properties.asJava)
+ writeToStagedTable(stagedTable, writeOptions, ident)
+ case _ =>
+ // Note that this operation is potentially unsafe, but these are the
strict semantics of
Review comment:
I think we talked about this, and we concluded that this is the appropriate
behavior - but I'm still not sure it is wise to support an inherently unsafe
and potentially inconsistent operation. It's worth considering if we should
throw `UnsupportedOperationException` here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]