cloud-fan commented on pull request #35395:
URL: https://github.com/apache/spark/pull/35395#issuecomment-1061514337
Sorry for the late review as I've been struggling with the API design for a
long time. I understand that we want to have a unified DS v2 API for
delta-based and group-based data sources, but these two have very different
execution paths and I'm afraid a unified physical plan execution may fit one
very well but is awkward for the other.
The execution path of group-based data sources (please correct me if my
understanding is wrong):
1. Figure out which "groups" are affected w.r.t. the DELETE/UPDATE/MERGE
condition. Since the condition may contain subqueries, it's better to let Spark
submit the job to evaluate the condition and let data source report the
"groups", so that Spark can figure out which "groups" are affected. The
execution path can be: Spark asks the data source to give an initial list of
"groups" w.r.t. the condition, and then create a data source scan with the
initial "groups". The data source scan has a hidden column to report the "group
id", and spark runs the scan with the post-scan filters to collect the group
ids.
2. Scan these "groups" to get the new data after DELETE/UPDATE/MERGE, and
write new data to new "groups".
3. Commit the transaction to remove old "groups" and add new "groups".
To fit this execution path, the API proposal from me is (naming is not
finalized)
```
interface SupportsGroupBasedUpsert extends Table {
String[] getAffectedGroups(Filter[] condition);
// This scan should put an extra string column at the end of each row to
indicate the group id.
ScanBuilder newScanBuilder(String[] groups, CaseInsensitiveStringMap
options);
// The created `Write` must exends `SupportReplaceGroups`
WriteBuilder newWriteBuilder(CaseInsensitiveStringMap options);
}
...
interface SupportReplaceGroups extends Write {
BatchWrite toBatch(String[] groups);
}
```
Note: the API above assumes the group can always be represented as a string.
If there are objections, we can use binary and add an API to
serialize/deserialize groups. e.g.
```
Group[] getAffectedGroups(Filter[] condition);
Group fromBinary(byte[] binary);
// This scan should put an extra binary column at the end of each row to
indicate the group id.
ScanBuilder newScanBuilder(Group[] groups, CaseInsensitiveStringMap options);
...
interface Group extends Serializable {}
```
We can also follow `ContinuousPartitionReader` if we want more type-safety
and don't want to use hidden columns. But it will make the API much more
complicated.
The entire workflow can be
1. The initial logical plan is still `DeleteFromTable`
2. An optimizer rule turns `DeleteFromTable` into
`GroupBasedDeleteFromTable`, which extracts translatable v2 `Filter`s from the
catalyst condition and calls `getAffectedGroups` to get the initial group list.
Then create v2 `Scan` with the initial group list.
3. `GroupBasedDeleteFromTable` is defined as
```
GroupBasedDeleteFromTable(
SubqueryExpression(Distinct(Project(the_group_id_col, Filter(cond,
V2Scan)))),
Filter(Not(cond), V2Scan),
V2Write)
```
4. the planner rule turns `GroupBasedDeleteFromTable` into its corresponding
physical plan
5. The physical plan get the final group list from the subquery, create
`BatchWrite` and write data, and finally commit the transaction.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]