cloud-fan edited a comment on pull request #35395:
URL: https://github.com/apache/spark/pull/35395#issuecomment-1061514337


   Sorry for the late review as I've been struggling with the API design for a 
long time. I understand that we want to have a unified DS v2 API for 
delta-based and group-based data sources, but these two have very different 
execution paths and I'm afraid a unified physical plan execution may fit one 
very well but is awkward for the other.
   
   The execution path of group-based data sources (please correct me if my 
understanding is wrong):
   1. Figure out which "groups" are affected w.r.t. the DELETE/UPDATE/MERGE 
condition. Since the condition may contain subqueries, it's better to let Spark 
submit the job to evaluate the condition and let data source report the 
"groups", so that Spark can figure out which "groups" are affected. The 
execution path can be: Spark asks the data source to give an initial list of 
"groups" w.r.t. the condition, and then create a data source scan with the 
initial "groups". The data source scan has a hidden column to report the "group 
id", and spark runs the scan with the post-scan filters to collect the group 
ids.
   2. Scan these "groups" to get the new data after DELETE/UPDATE/MERGE, and 
write new data to new "groups".
   3. Commit the transaction to remove old "groups" and add new "groups".
   
   To fit this execution path, the API proposal from me is (naming is not 
finalized)
   ```
   interface SupportsGroupBasedUpsert extends Table {
     String[] getAffectedGroups(Filter[] condition);
     // This scan should put an extra string column at the end of each row to 
indicate the group id.
     ScanBuilder newScanBuilder(String[] groups, CaseInsensitiveStringMap 
options);
    // The created `Write` must exends `SupportReplaceGroups`
     WriteBuilder newWriteBuilder(CaseInsensitiveStringMap options);
   }
   ...
   interface SupportReplaceGroups extends Write {
     BatchWrite toBatch(String[] groups);
   }
   ```
   Note: the API above assumes the group can always be represented as a string. 
If there are objections, we can use binary and add an API to 
serialize/deserialize groups. e.g.
   ```
   Group[] getAffectedGroups(Filter[] condition);
   Group fromBinary(byte[] binary);
   // This scan should put an extra binary column at the end of each row to 
indicate the group id.
   ScanBuilder newScanBuilder(Group[] groups, CaseInsensitiveStringMap options);
   ...
   interface Group extends Serializable {}
   ```
   We can also follow `ContinuousPartitionReader` if we want more type-safety 
and don't want to use hidden columns. But it will make the API much more 
complicated.
   
   The entire workflow can be
   1. The initial logical plan is still `DeleteFromTable`
   2. An optimizer rule turns `DeleteFromTable` into 
`GroupBasedDeleteFromTable`, which extracts translatable v2 `Filter`s from the 
catalyst condition and calls `getAffectedGroups` to get the initial group list. 
Then create v2 `Scan` with the initial group list.
   3. `GroupBasedDeleteFromTable` is defined as
   ```
   GroupBasedDeleteFromTable(
     SubqueryExpression(Distinct(Project(the_group_id_col, Filter(cond, 
V2Scan)))),
     Filter(Not(cond), V2Scan),
     V2Write)
   ```
   4. the planner rule turns `GroupBasedDeleteFromTable` into its corresponding 
physical plan
   5. The physical plan get the final group list from the subquery, create 
`BatchWrite` and write data, and finally commit the transaction.
   
   We can also update the rule `V2Writes` to add shuffle/sort for 
`GroupBasedDeleteFromTable`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to