cloud-fan commented on code in PR #35395: URL: https://github.com/apache/spark/pull/35395#discussion_r838690453
########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/OptimizeMetadataOnlyDeleteFromTable.scala: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral +import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, DeleteFromTableWithFilters, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.{SupportsDelete, TruncatableTable} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources + +/** + * Checks whether a delete using filters is possible and nullifies the rewrite plan + * if the source can handle this delete without executing the rewrite plan. + * + * Note this rule must be run after expression optimization but before scan planning. + */ +object OptimizeMetadataOnlyDeleteFromTable extends Rule[LogicalPlan] with PredicateHelper { + + override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case d @ DeleteFromTable(relation: DataSourceV2Relation, cond, Some(_)) => Review Comment: I'm thinking about it a bit more. I think it's clearer if we resolve group-based DELETE in two steps: 1. create a plan to get affected groups 2. create a plan to get the undeleted rows These two steps can share one `ScanBuilder` instance. First, we should have an analyzer rule to rewrite `DeleteFromTable`, similar to what this PR does. But we can make it simpler. We should just replace `DeleteFromTable` with a new plan ``` case class GroupBasedReplaceData( table: NamedRelation, condition: Expression, affectedGroups: LogicalPlan, query: LogicalPlan, write: Option[Write] = None) extends V2WriteCommand with SupportsSubquery { } ``` - The first 2 arguments are directly from `DeleteFromTable`, so that we can go back to `DeleteFromTable` later. - The `affectedGroups` is just a `Filter` with the DELETE condition, on a DS v2 relation with `RowLevelOperationTable` that is introduced in this PR. - The `query` is almost the same as `affectedGroups` except that its `Filter` condition is the negated DELETE condition. - The `affectedGroups` and `query` share the same `RowLevelOperationTable` instance, and `RowLevelOperationTable` should create `ScanBuilder` only once, e.g. ``` private lazy val scanBuilder = table.asReadable.newScanBuilder(CaseInsensitiveStringMap.empty()) override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = scanBuilder ``` Second, we apply a new optimizer rule right before `V2ScanRelationPushDown`. This new rule does predicate pushdown for the `affectedGroups`, to configure `RowLevelOperationTable.scanBuilder`. In the future, when we add APIs to create dedicated v2 `Scan` to get affected groups, this rule can also evaluate the query and collect the affected groups, and add an IN predicate to `query`. This rule can also go back to `DeleteFromTable` if we can. Next, the `V2ScanRelationPushDown` will just work and apply operator pushdown to `GroupBasedReplaceData.query`. Note that, predicate pushdown will be done again here, which is useful: the first predicate pushdown allows the data source to determine which groups to replace, the second predicate pushdown allows the data source to do pruning at runtime (like parquet row group pruning). Finally, we evaluate `GroupBasedReplaceDataExec`, similar to what this PR does. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
