rdblue commented on a change in pull request #29066: URL: https://github.com/apache/spark/pull/29066#discussion_r498578514
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import java.util.UUID + +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.PredicateHelper +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, Write, WriteBuilder} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{AlwaysTrue, Filter} + +/** + * A rule that constructs [[Write]]s. + */ +object V2Writes extends Rule[LogicalPlan] with PredicateHelper { + + import DataSourceV2Implicits._ + + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { + case AppendData(relation: DataSourceV2Relation, query, options, _) => + val writeBuilder = newWriteBuilder(relation.table, query, options) + val write = writeBuilder.build() + V2BatchWriteCommand(write, query) + + case OverwriteByExpression(relation: DataSourceV2Relation, deleteExpr, query, options, _) => + // fail if any filter cannot be converted. correctness depends on removing all matching data. + val filters = splitConjunctivePredicates(deleteExpr).map { + filter => DataSourceStrategy.translateFilter(deleteExpr, + supportNestedPredicatePushdown = true).getOrElse( + throw new AnalysisException(s"Cannot translate expression to source filter: $filter")) + }.toArray + + val table = relation.table + val writeBuilder = newWriteBuilder(table, query, options) + val write = writeBuilder match { + case builder: SupportsTruncate if isTruncate(filters) => + builder.truncate().build() + case builder: SupportsOverwrite => + builder.overwrite(filters).build() + case _ => + throw new SparkException(s"Table does not support overwrite by expression: $table") + } + + V2BatchWriteCommand(write, query) Review comment: I can see why it made sense to move back to a generic `WriteToDataSourceV2`, but I think we do want write-specific exec nodes, even if the information isn't used. This change would cause all v2 writes to show up in the Spark UI as `WriteToDataSourceV2`, rather than the current `AppendData` or `OverwritePartitionsDynamic`. It also makes it so the physical plan doesn't show important information, like the delete expression for `OverwriteByExpression`. That information _could_ be recovered from the source's `Write` using getters, or maybe it would be part of the write's `toString` output, but I wouldn't want to rely on either of those. The Spark plan should keep track of this information for good `EXPLAIN` output. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
