aokolnychyi commented on pull request #35395:
URL: https://github.com/apache/spark/pull/35395#issuecomment-1062176426
> I'm afraid a unified physical plan execution may fit one very well but is
awkward for the other.
@cloud-fan, I am not proposing a unified execution plan. In fact, we should
rewrite delta-based and group-based rewrites in a completely different way,
just like you said. Whatever I have here is for group-based sources. I had an
early prototype for delta-based sources in the old PR #33008. It has a separate
execution path with `WriteDelta` instead of `ReplaceData`.
> Figure out which "groups" are affected w.r.t. the DELETE/UPDATE/MERGE
condition. Since the condition may contain subqueries, it's better to let Spark
submit the job to evaluate the condition and let data source report the
"groups", so that Spark can figure out which "groups" are affected. The
execution path can be: Spark asks the data source to give an initial list of
"groups" w.r.t. the condition, and then create a data source scan with the
initial "groups". The data source scan has a hidden column to report the "group
id", and spark runs the scan with the post-scan filters to collect the group
ids.
Scan these "groups" to get the new data after DELETE/UPDATE/MERGE, and write
new data to new "groups".
Commit the transaction to remove old "groups" and add new "groups".
My proposal should support the use case you describe. Here is a quote from
the design doc:
> For example, sources that support replacing individual files may use
static filters to prune partitions and files using metadata. This will allow
them to find files that may have rows to delete/update. Since the metadata
filtering is not precise and rewriting data is expensive, it may make sense to
scan the potentially matching files for matches using a separate filtering
query to reduce the amount of data to rewrite.
I am proposing to leverage runtime filtering for this. It would just require
a new optimizer rule and would work well with the proposed APIs in this PR. We
can even include runtime filtering in 3.3 as DS V2 supports runtime filtering.
I mentioned this in the SPIP doc. Here is how that optimizer rule can look like
(it will assign a subquery to find matching groups).
```
object RowLevelCommandDynamicPruning extends Rule[LogicalPlan] with
PredicateHelper {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
// apply special dynamic filtering only for plans that don't support
deltas
case RewrittenRowLevelCommand(
command: RowLevelCommand,
DataSourceV2ScanRelation(_, scan: SupportsRuntimeFiltering, _),
rewritePlan: ReplaceData) if conf.dynamicPartitionPruningEnabled &&
isCandidate(command) =>
// use reference equality to find exactly the required scan relations
val newRewritePlan = rewritePlan transformUp {
case r: DataSourceV2ScanRelation if r.scan eq scan =>
val pruningKeys =
V2ExpressionUtils.resolveRefs[Attribute](scan.filterAttributes, r)
val dynamicPruningCond = buildDynamicPruningCondition(r, command,
pruningKeys)
Filter(dynamicPruningCond, r)
}
command.withNewRewritePlan(newRewritePlan)
}
}
```
Sources like Delta can expose a metadata column `_file` and support dynamic
filtering on it during row-level operations. I think that will be cleaner than
having a notion of a group in the APIs.
Tests I added also show how writes can access what "groups" were scanned and
should be replaced. We don't have to pass replaced groups explicitly. For
instance, `PartitionBasedOperation` added for testing remembers the scan that
was used and the write has access to it. That's why scans can report to writes
what groups must be replaced.
The purpose of `RowLevelOperation` is to coordinate scans and writes.
I think the proposed API should work well for Delta, Iceberg, and even Hive
(where groups are partitions).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]