cloud-fan commented on code in PR #35395:
URL: https://github.com/apache/spark/pull/35395#discussion_r838690453


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/OptimizeMetadataOnlyDeleteFromTable.scala:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, 
SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, 
DeleteFromTableWithFilters, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.{SupportsDelete, 
TruncatableTable}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources
+
+/**
+ * Checks whether a delete using filters is possible and nullifies the rewrite 
plan
+ * if the source can handle this delete without executing the rewrite plan.
+ *
+ * Note this rule must be run after expression optimization but before scan 
planning.
+ */
+object OptimizeMetadataOnlyDeleteFromTable extends Rule[LogicalPlan] with 
PredicateHelper {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case d @ DeleteFromTable(relation: DataSourceV2Relation, cond, Some(_)) =>

Review Comment:
   I'm thinking about it a bit more. I think it's clearer if we resolve 
group-based DELETE in two steps:
   1. create a plan to get affected groups
   2. create a plan to get the undeleted rows
   
   These two steps can share one `ScanBuilder` instance.
   
   First, we should have an analyzer rule to rewrite `DeleteFromTable`, similar 
to what this PR does. But we can make it simpler. We should just replace 
`DeleteFromTable` with a new plan
   ```
   case class GroupBasedReplaceData(
       table: NamedRelation,
       condition: Expression,
       affectedGroups: LogicalPlan,
       query: LogicalPlan,
       write: Option[Write] = None) extends V2WriteCommand with 
SupportsSubquery {
   }
   ```
   - The first 2 arguments are directly from `DeleteFromTable`, so that we can 
go back to `DeleteFromTable` later.
   - The `affectedGroups` is just a `Filter` with the DELETE condition, on a DS 
v2 relation with `RowLevelOperationTable` that is introduced in this PR.
   - The `query` is almost the same as `affectedGroups` except that its 
`Filter` condition is the negated DELETE condition.
   - The `affectedGroups` and `query` share the same `RowLevelOperationTable` 
instance, and `RowLevelOperationTable` should create `ScanBuilder` only once, 
e.g.
   ```
   private lazy val scanBuilder = 
table.asReadable.newScanBuilder(CaseInsensitiveStringMap.empty())
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= scanBuilder
   ```
   
   Second, we apply a new optimizer rule right before `V2ScanRelationPushDown`. 
This new rule does predicate pushdown for the `affectedGroups`, to configure 
`RowLevelOperationTable.scanBuilder`. In the future, when we add APIs to create 
dedicated v2 `Scan` to get affected groups, this rule can also evaluate the 
query and collect the affected groups, and add an IN predicate to `query`. This 
rule can also go back to `DeleteFromTable` if we can.
   
   Next, the `V2ScanRelationPushDown` will just work and apply operator 
pushdown to `GroupBasedReplaceData.query`. Note that, predicate pushdown will 
be done again here, which is useful: the first predicate pushdown allows the 
data source to determine which groups to replace, the second predicate pushdown 
allows the data source to do pruning at runtime (like parquet row group 
pruning).
   
   Finally, we evaluate `GroupBasedReplaceDataExec`, similar to what this PR 
does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to