sunchao commented on a change in pull request #35395:
URL: https://github.com/apache/spark/pull/35395#discussion_r826611125



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Expression, 
Not}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, 
LogicalPlan, ReplaceData}
+import org.apache.spark.sql.connector.catalog.{SupportsDelete, 
SupportsRowLevelOperations, TruncatableTable}
+import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE
+import org.apache.spark.sql.connector.write.RowLevelOperationTable
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/**
+ * Assigns a rewrite plan for v2 tables that support rewriting data to handle 
DELETE statements.
+ *
+ * If a table implements [[SupportsDelete]] and 
[[SupportsRowLevelOperations]], this rule assigns
+ * a rewrite plan but the optimizer will check whether this particular DELETE 
statement can be
+ * handled by simply passing delete filters to the connector. If yes, the 
optimizer will then
+ * discard the rewrite plan.
+ */
+object RewriteDeleteFromTable extends RewriteRowLevelCommand {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case d @ DeleteFromTable(aliasedTable, cond, None) if d.resolved =>
+      EliminateSubqueryAliases(aliasedTable) match {
+        case DataSourceV2Relation(_: TruncatableTable, _, _, _, _) if cond == 
TrueLiteral =>
+          // don't assign a rewrite plan as the table supports truncation
+          d
+
+        case r @ DataSourceV2Relation(t: SupportsRowLevelOperations, _, _, _, 
_) =>
+          val table = buildOperationTable(t, DELETE, 
CaseInsensitiveStringMap.empty())

Review comment:
       should we pass `options` from the V2 relation instead of just using 
`empty`?

##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RowLevelOperation.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.write;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * A logical representation of a data source DELETE, UPDATE, or MERGE 
operation that requires
+ * rewriting data.
+ *
+ * @since 3.3.0
+ */
+@Experimental
+public interface RowLevelOperation {
+
+  /**
+   * The SQL row-level operation.
+   */
+  enum Command {
+    DELETE, UPDATE, MERGE
+  }
+
+  /**
+   * Returns the description associated with this row-level operation.
+   */
+  default String description() {
+    return this.getClass().toString();
+  }
+
+  /**
+   * Returns the actual SQL operation being performed.
+   */
+  Command command();
+
+  /**
+   * Returns a {@link ScanBuilder} to configure a {@link Scan} for this 
row-level operation.
+   * <p>
+   * Sources fall into two categories: those that can handle a delta of rows 
and those that need
+   * to replace groups (e.g. partitions, files). Sources that handle deltas 
allow Spark to quickly
+   * discard unchanged rows and have no requirements for input scans. Sources 
that replace groups
+   * of rows can discard deleted rows but need to keep unchanged rows to be 
passed back into
+   * the source. This means that scans for such data sources must produce all 
rows in a group
+   * if any are returned. Some sources will avoid pushing filters into files 
(file granularity),
+   * while others will avoid pruning files within a partition (partition 
granularity).
+   * <p>
+   * For example, if a source can only replace partitions, all rows from a 
partition must
+   * be returned by the scan, even if a filter can narrow the set of changes 
to a single file
+   * in the partition. Similarly, a source that can swap individual files must 
produce all rows
+   * of files where at least one record must be changed, not just the rows 
that must be changed.
+   */
+  ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);

Review comment:
       curious: for non-delta based data sources, will this just share the same 
implementation with `SupportsRead.newScanBuilder`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/OptimizeMetadataOnlyDeleteFromTable.scala
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, 
SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, 
DeleteFromTableWithFilters, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.{SupportsDelete, 
TruncatableTable}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources
+
+/**
+ * Checks whether a delete using filters is possible and nullifies the rewrite 
plan
+ * if the source can handle this delete without executing the rewrite plan.
+ *
+ * Note this rule must be run after expression optimization but before scan 
planning.
+ */
+object OptimizeMetadataOnlyDeleteFromTable extends Rule[LogicalPlan] with 
PredicateHelper {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case d @ DeleteFromTable(relation: DataSourceV2Relation, cond, Some(_)) =>
+      relation.table match {
+        case table: SupportsDelete if !SubqueryExpression.hasSubquery(cond) =>
+          val predicates = splitConjunctivePredicates(cond)
+          val normalizedPredicates = 
DataSourceStrategy.normalizeExprs(predicates, relation.output)
+          val filters = toDataSourceFilters(normalizedPredicates)
+          val allPredicatesTranslated = normalizedPredicates.size == 
filters.length
+          if (allPredicatesTranslated && table.canDeleteWhere(filters)) {
+            logDebug(s"Switching to delete with filters: 
${filters.mkString("[", ", ", "]")}")
+            DeleteFromTableWithFilters(relation, filters)
+          } else {
+            d
+          }
+
+        case _: TruncatableTable if cond == TrueLiteral =>

Review comment:
       when this will be triggered? I think we don't assign rewrite plan 
in`RewriteDeleteFromTable`.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Expression, 
Not}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, 
LogicalPlan, ReplaceData}
+import org.apache.spark.sql.connector.catalog.{SupportsDelete, 
SupportsRowLevelOperations, TruncatableTable}
+import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE
+import org.apache.spark.sql.connector.write.RowLevelOperationTable
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/**
+ * Assigns a rewrite plan for v2 tables that support rewriting data to handle 
DELETE statements.
+ *
+ * If a table implements [[SupportsDelete]] and 
[[SupportsRowLevelOperations]], this rule assigns
+ * a rewrite plan but the optimizer will check whether this particular DELETE 
statement can be
+ * handled by simply passing delete filters to the connector. If yes, the 
optimizer will then
+ * discard the rewrite plan.
+ */
+object RewriteDeleteFromTable extends RewriteRowLevelCommand {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case d @ DeleteFromTable(aliasedTable, cond, None) if d.resolved =>
+      EliminateSubqueryAliases(aliasedTable) match {
+        case DataSourceV2Relation(_: TruncatableTable, _, _, _, _) if cond == 
TrueLiteral =>
+          // don't assign a rewrite plan as the table supports truncation
+          d
+
+        case r @ DataSourceV2Relation(t: SupportsRowLevelOperations, _, _, _, 
_) =>
+          val table = buildOperationTable(t, DELETE, 
CaseInsensitiveStringMap.empty())
+          val rewritePlan = buildReplaceDataPlan(r, table, cond)
+          // keep the original relation in DELETE to also attempt deleting 
with filters
+          DeleteFromTable(r, cond, Some(rewritePlan))
+
+        case DataSourceV2Relation(_: SupportsDelete, _, _, _, _) =>
+          // don't assign a rewrite plan as the table supports deletes only 
with filters
+          d
+
+        case DataSourceV2Relation(t, _, _, _, _) =>
+          throw new AnalysisException(s"Table $t does not support DELETE 
statements")
+
+        case _ =>
+          d
+      }
+  }
+
+  // build a rewrite plan for sources that support replacing groups of data 
(e.g. files, partitions)
+  private def buildReplaceDataPlan(
+      relation: DataSourceV2Relation,
+      operationTable: RowLevelOperationTable,
+      cond: Expression): ReplaceData = {
+
+    // resolve all required metadata attrs that may be used for grouping data 
on write
+    // for instance, JDBC data source may cluster data by shard/host before 
writing
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, 
operationTable.operation)
+
+    // construct a read relation and include all required metadata columns
+    val readRelation = buildRelationWithAttrs(relation, operationTable, 
metadataAttrs)
+
+    // construct a plan that contains unmatched rows in matched groups that 
must be carried over
+    // such rows do not match the condition but have to be copied over as the 
source can replace
+    // only groups of rows (e.g. if a source supports replacing files, 
unmatched rows in matched
+    // files must be carried over)
+    // it is safe to negate the condition here as 
RowLevelCommandScanRelationPushDown
+    // handles predicate pushdown for row-level operations in a special way
+    val remainingRowsFilter = Not(EqualNullSafe(cond, TrueLiteral))

Review comment:
       curious why we use `EqualNullSafe` here. What does it mean when `cond` 
evaluates to null?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Expression, 
Not}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, 
LogicalPlan, ReplaceData}
+import org.apache.spark.sql.connector.catalog.{SupportsDelete, 
SupportsRowLevelOperations, TruncatableTable}
+import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE
+import org.apache.spark.sql.connector.write.RowLevelOperationTable
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/**
+ * Assigns a rewrite plan for v2 tables that support rewriting data to handle 
DELETE statements.
+ *
+ * If a table implements [[SupportsDelete]] and 
[[SupportsRowLevelOperations]], this rule assigns
+ * a rewrite plan but the optimizer will check whether this particular DELETE 
statement can be
+ * handled by simply passing delete filters to the connector. If yes, the 
optimizer will then
+ * discard the rewrite plan.
+ */
+object RewriteDeleteFromTable extends RewriteRowLevelCommand {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case d @ DeleteFromTable(aliasedTable, cond, None) if d.resolved =>
+      EliminateSubqueryAliases(aliasedTable) match {
+        case DataSourceV2Relation(_: TruncatableTable, _, _, _, _) if cond == 
TrueLiteral =>
+          // don't assign a rewrite plan as the table supports truncation
+          d
+
+        case r @ DataSourceV2Relation(t: SupportsRowLevelOperations, _, _, _, 
_) =>
+          val table = buildOperationTable(t, DELETE, 
CaseInsensitiveStringMap.empty())
+          val rewritePlan = buildReplaceDataPlan(r, table, cond)
+          // keep the original relation in DELETE to also attempt deleting 
with filters
+          DeleteFromTable(r, cond, Some(rewritePlan))
+
+        case DataSourceV2Relation(_: SupportsDelete, _, _, _, _) =>
+          // don't assign a rewrite plan as the table supports deletes only 
with filters
+          d
+
+        case DataSourceV2Relation(t, _, _, _, _) =>
+          throw new AnalysisException(s"Table $t does not support DELETE 
statements")

Review comment:
       `Table ${t.name()} does not support DELETE statements`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
##########
@@ -284,6 +284,22 @@ case class OverwritePartitionsDynamicExec(
     copy(query = newChild)
 }
 
+/**
+ * Physical plan node to replace data in existing tables.
+ */
+case class ReplaceDataExec(
+    query: SparkPlan,
+    refreshCache: () => Unit,
+    write: Write) extends V2ExistingTableWriteExec {
+
+  override lazy val references: AttributeSet = query.outputSet

Review comment:
       nit: wonder if we need these for other case classes too.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RowLevelCommandScanRelationPushDown.scala
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, Expression, PredicateHelper, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.RewrittenRowLevelCommand
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.expressions.filter.{Filter => V2Filter}
+import org.apache.spark.sql.connector.read.ScanBuilder
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.Filter
+
+/**
+ * A rule that builds a scan in row-level operations that require rewriting 
data.
+ *
+ * Note this rule must be run before [[V2ScanRelationPushDown]] as row-level 
operations must be
+ * planned in a special way.
+ */
+object RowLevelCommandScanRelationPushDown extends Rule[LogicalPlan] with 
PredicateHelper {
+  import DataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+    // push down the filter from the command condition instead of the filter 
in the rewrite plan,
+    // which may be negated for sources that only support replacing groups of 
data (e.g. files)
+    case RewrittenRowLevelCommand(command, relation: DataSourceV2Relation, 
rewritePlan) =>
+      val table = relation.table.asRowLevelOperationTable
+      val condition = command.condition
+      val scanBuilder = table.newScanBuilder(relation.options)
+
+      val (pushedFilters, remainingFilters) = pushFilters(condition, 
relation.output, scanBuilder)
+      val pushedFiltersStr = if (pushedFilters.isLeft) {
+        pushedFilters.left.get.mkString(", ")
+      } else {
+        pushedFilters.right.get.mkString(", ")
+      }
+
+      val (scan, output) = PushDownUtils.pruneColumns(scanBuilder, relation, 
relation.output, Nil)
+
+      logInfo(
+        s"""
+           |Pushing operators to ${relation.name}
+           |Pushed filters: ${pushedFiltersStr}
+           |Filters that were not pushed: ${remainingFilters.mkString(",")}

Review comment:
       nit: maybe add a space after ,




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to