rdblue commented on a change in pull request #23606: [SPARK-26666][SQL] Support
DSv2 overwrite and dynamic partition overwrite.
URL: https://github.com/apache/spark/pull/23606#discussion_r256640986
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
##########
@@ -41,18 +46,114 @@ case class WriteToDataSourceV2(batchWrite: BatchWrite,
query: LogicalPlan)
override def output: Seq[Attribute] = Nil
}
+case class AppendDataExec(
+ table: SupportsBatchWrite,
+ writeOptions: DataSourceOptions,
+ query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val batchWrite = newWriteBuilder() match {
+ case builder: SupportsSaveMode =>
+ builder.mode(SaveMode.Append).buildForBatch()
+
+ case builder =>
+ builder.buildForBatch()
+ }
+ doWrite(batchWrite)
+ }
+}
+
+case class OverwriteByExpressionExec(
+ table: SupportsBatchWrite,
+ filters: Array[Filter],
+ writeOptions: DataSourceOptions,
+ query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
+
+ private def isTruncate(filters: Array[Filter]): Boolean = {
+ filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
+ }
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val batchWrite = newWriteBuilder() match {
+ case builder: SupportsTruncate if isTruncate(filters) =>
+ builder.truncate().buildForBatch()
+
+ case builder: SupportsOverwrite =>
+ builder.overwrite(filters).buildForBatch()
+
+ case builder: SupportsSaveMode =>
+ builder.mode(SaveMode.Overwrite).buildForBatch()
+
+ case _ =>
+ throw new SparkException(s"Table does not support dynamic partition
overwrite: $table")
+ }
+
+ doWrite(batchWrite)
+ }
+}
+
+case class OverwritePartitionsDynamicExec(
+ table: SupportsBatchWrite,
+ writeOptions: DataSourceOptions,
+ query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val batchWrite = newWriteBuilder() match {
+ case builder: SupportsDynamicOverwrite =>
+ builder.overwriteDynamicPartitions().buildForBatch()
+
+ case builder: SupportsSaveMode =>
+ builder.mode(SaveMode.Overwrite).buildForBatch()
+
+ case _ =>
+ throw new SparkException(s"Table does not support dynamic partition
overwrite: $table")
+ }
+
+ doWrite(batchWrite)
+ }
+}
+
+case class WriteToDataSourceV2Exec(
+ batchWrite: BatchWrite,
+ query: SparkPlan
+ ) extends V2TableWriteExec {
+
+ import DataSourceV2Implicits._
+
+ def writeOptions: DataSourceOptions = Map.empty[String,
String].toDataSourceOptions
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ doWrite(batchWrite)
+ }
+}
+
+/**
+ * Helper for physical plans that build batch writes.
+ */
+trait BatchWriteHelper {
+ def table: SupportsBatchWrite
+ def query: SparkPlan
+ def writeOptions: DataSourceOptions
+
+ def newWriteBuilder(): WriteBuilder = {
+ table.newWriteBuilder(writeOptions)
+ .withInputDataSchema(query.schema)
+ .withQueryId(UUID.randomUUID().toString)
+ }
+}
+
/**
- * The physical plan for writing data into data source v2.
+ * The base physical plan for writing data into data source v2.
*/
-case class WriteToDataSourceV2Exec(batchWrite: BatchWrite, query: SparkPlan)
- extends UnaryExecNode {
+trait V2TableWriteExec extends UnaryExecNode {
Review comment:
No, v2 exec nodes do not support metrics. I've implemented this in our
branch and I can submit a PR to add them, but this PR doesn't change the write
logic at all. I think metrics should be added in a follow-up.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]