aokolnychyi commented on pull request #35395: URL: https://github.com/apache/spark/pull/35395#issuecomment-1069573681
@cloud-fan, thanks for thinking this through with me! I genuinely want to make sure this API works for every data source. The two scenarios you mention seem correct to me. I agree data sources may or may not want Spark to run an extra query to filter out groups. If a data source is OK with simple static filter pushdown, Spark should not trigger runtime filtering. > Now let's look at the proposed API. I think it works pretty well for the simplified process, and I believe you have a plan to support the non-simple process by adding extension APIs. Just to make sure it will be backward compatible and we are on the same page I propose to handle the non-simple case by just adding an optimizer rule that would work with the APIs added in this PR. No extra classes are necessary. I think we can achieve that by reusing the same `Scan` object in the main and runtime filtering queries. Let me show how that will work. For example, data sources implement `RowLevelOperation` that will give access to a scan builder. ``` ScanBuilder newScanBuilder(CaseInsensitiveStringMap options); ``` Spark will pass a static predicate to the scan builder. That is sufficient for the simple use case. The more complicated use case requires Spark to do runtime filtering on top of this. That's why if `Scan` implements `SupportsRuntimeFiltering`, Spark can execute that extra query via its usual runtime filtering mechanism. The idea is to reuse the same `Scan` object we built for the row-level operation in the filtering subquery. That way, we avoid planning the same job twice and runtime filtering becomes a totally optional step for which data sources opt in by returning a filterable `Scan` by group ID. We may not need a separate scan builder for the filtering subquery. Does this make sense? > row-level operation does not even call out the data writing behavior. Shall we consider other names like SupportsDataUpdating, SupportsUPSERT, et.? I went back and forth on the name and then settled on `SupportsRowLevelOperations` as it describes all there SQL operations: DELETE, UPDATE, MERGE. I am open to alternatives, though. I think `SupportsDataUpdating` focuses mostly on UPDATE and it is not very descriptive for DELETE and MERGE. Also, UPSERT has slightly different meaning compared to DELETE, UPDATE and MERGE so `SupportsUPSERT` can be misinterpreted. I'd love Spark to support UPSERT but it is beyond the scope of my proposal. > Why do we put the replace plan as a parameter of DeleteFromTable? Can we simply replace DeleteFromTable with the new replace plan in the rule? It is done so that we can attempt a metadata-only delete via `SupportsDelete`. Suppose our DELETE condition has predicates only on partition columns. Hive, Delta, Iceberg will be able to handle such a delete using a metadata operation without rewriting data. The rewrite plan is assigned in the analyzer but we check whether a metadata delete is possible only in the optimizer. Until then, we don't know whether the rewrite plan will be executed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
