aokolnychyi commented on pull request #35395:
URL: https://github.com/apache/spark/pull/35395#issuecomment-1069573681


   @cloud-fan, thanks for thinking this through with me! I genuinely want to 
make sure this API works for every data source.
   
   The two scenarios you mention seem correct to me. I agree data sources may 
or may not want Spark to run an extra query to filter out groups. If a data 
source is OK with simple static filter pushdown, Spark should not trigger 
runtime filtering.
   
   > Now let's look at the proposed API. I think it works pretty well for the 
simplified process, and I believe you have a plan to support the non-simple 
process by adding extension APIs. Just to make sure it will be backward 
compatible and we are on the same page
   
   I propose to handle the non-simple case by just adding an optimizer rule 
that would work with the APIs added in this PR. No extra classes are necessary. 
I think we can achieve that by reusing the same `Scan` object in the main and 
runtime filtering queries. Let me show how that will work.
   
   For example, data sources implement `RowLevelOperation` that will give 
access to a scan builder.
   
   ```
   ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);
   ```
   
   Spark will pass a static predicate to the scan builder. That is sufficient 
for the simple use case. The more complicated use case requires Spark to do 
runtime filtering on top of this. That's why if `Scan` implements 
`SupportsRuntimeFiltering`, Spark can execute that extra query via its usual 
runtime filtering mechanism. The idea is to reuse the same `Scan` object we 
built for the row-level operation in the filtering subquery. That way, we avoid 
planning the same job twice and runtime filtering becomes a totally optional 
step for which data sources opt in by returning a filterable `Scan` by group 
ID. We may not need a separate scan builder for the filtering subquery.
   
   Does this make sense?
   
   > row-level operation does not even call out the data writing behavior. 
Shall we consider other names like SupportsDataUpdating, SupportsUPSERT, et.?
   
   I went back and forth on the name and then settled on 
`SupportsRowLevelOperations` as it describes all there SQL operations: DELETE, 
UPDATE, MERGE. I am open to alternatives, though. I think 
`SupportsDataUpdating` focuses mostly on UPDATE and it is not very descriptive 
for DELETE and MERGE. Also, UPSERT has slightly different meaning compared to 
DELETE, UPDATE and MERGE so `SupportsUPSERT` can be misinterpreted. I'd love 
Spark to support UPSERT but it is beyond the scope of my proposal.
   
   > Why do we put the replace plan as a parameter of DeleteFromTable? Can we 
simply replace DeleteFromTable with the new replace plan in the rule?
   
   It is done so that we can attempt a metadata-only delete via 
`SupportsDelete`. Suppose our DELETE condition has predicates only on partition 
columns. Hive, Delta, Iceberg will be able to handle such a delete using a 
metadata operation without rewriting data. The rewrite plan is assigned in the 
analyzer but we check whether a metadata delete is possible only in the 
optimizer. Until then, we don't know whether the rewrite plan will be executed.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to