Flyangz opened a new pull request #34953:
URL: https://github.com/apache/spark/pull/34953
### What changes were proposed in this pull request?
Adjust the grouping rules of `distinctAggGroups`, specifically in
`RewriteDistinctAggregates.groupDistinctAggExpr`, so that some 'distinct' can
be grouped together, and conditions(eg. CaseWhen, If) involved in them will be
stored in the 'if_vector' to avoid unnecessary expanding. The 'if_vector' and
'filter_vector' introduced here can reduce the number of columns in the expand.
Besides, children in distinct aggregate function with same datatype will share
same project column.
Here is a example comparing the difference between the original expand
rewriting and the new with 'merged column' and 'bit vector' (in sql):
```sql
SELECT
COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_filter_cnt_dist,
COUNT(DISTINCT cat2) FILTER (WHERE id > 2) as cat2_filter_cnt_dist,
COUNT(DISTINCT IF(value > 5, cat1, null)) as cat1_if_cnt_dist,
COUNT(DISTINCT id) as id_cnt_dist,
SUM(DISTINCT value) as id_sum_dist
FROM data
GROUP BY key
```
Current rule will rewrite the above sql plan to the following (pseudo)
logical plan:
```
Aggregate(
key = ['key]
functions = [
count('cat1) FILTER (WHERE (('gid = 1) AND 'max(id > 1))),
count('(IF((value > 5), cat1, null))) FILTER (WHERE ('gid = 5)),
count('cat2) FILTER (WHERE (('gid = 3) AND 'max(id > 2))),
count('id) FILTER (WHERE ('gid = 2)),
sum('value) FILTER (WHERE ('gid = 4))
]
output = ['key, 'cat1_filter_cnt_dist, 'cat2_filter_cnt_dist,
'cat1_if_cnt_dist,
'id_cnt_dist, 'id_sum_dist])
Aggregate(
key = ['key, 'cat1, 'value, 'cat2, '(IF((value > 5), cat1, null)), 'id,
'gid]
functions = [max('id > 1), max('id > 2)]
output = ['key, 'cat1, 'value, 'cat2, '(IF((value > 5), cat1, null)),
'id, 'gid,
'max(id > 1), 'max(id > 2)])
Expand(
projections = [
('key, 'cat1, null, null, null, null, 1, ('id > 1), null),
('key, null, null, null, null, 'id, 2, null, null),
('key, null, null, 'cat2, null, null, 3, null, ('id > 2)),
('key, null, 'value, null, null, null, 4, null, null),
('key, null, null, null, if (('value > 5)) 'cat1 else null, null,
5, null, null)
]
output = ['key, 'cat1, 'value, 'cat2, '(IF((value > 5), cat1, null)),
'id,
'gid, '(id > 1), '(id > 2)])
LocalTableScan [...]
```
After applying 'merged column' and 'bit vector' tricks, the logical plan
will become:
```
Aggregate(
key = ['key]
functions = [
count('merged_string_1) FILTER (WHERE (('gid = 1) AND NOT
(('filter_vector_1 & 1) = 0))),
count('merged_string_1) FILTER (WHERE ('gid = 1)),
count(if (NOT (('if_vector_1 & 1) = 0)) 'merged_string_1 else null)
FILTER (WHERE ('gid = 1)),
count('merged_string_1) FILTER (WHERE (('gid = 2) AND NOT
(('filter_vector_1 & 1) = 0))),
count('merged_integer_1) FILTER (WHERE ('gid = 3)),
sum('merged_integer_1) FILTER (WHERE ('gid = 4))
]
output = ['key, 'cat1_filter_cnt_dist, 'cat2_filter_cnt_dist,
'cat1_if_cnt_dist,
'id_cnt_dist, 'id_sum_dist])
Aggregate(
key = ['key, 'merged_string_1, 'merged_integer_1, 'gid]
functions = [bit_or('if_vector_1),bit_or('filter_vector_1)]
output = ['key, 'merged_string_1, 'merged_integer_1, 'gid,
'bit_or(if_vector_1), 'bit_or(filter_vector_1)])
Expand(
projections = [
('key, 'cat1, null, 1, if ('value > 5) 1 else 0, if ('id > 1) 1
else 0),
('key, 'cat2, null, 2, null, if ('id > 2) 1 else 0),
('key, null, 'id, 3, null, null),
('key, null, 'value, 4, null, null)
]
output = ['key, 'merged_string_1, 'merged_integer_1, 'gid,
'if_vector_1, 'filter_vector_1])
LocalTableScan [...]
```
### Why are the changes needed?
It can save mass memory and improve performance in some cases like:
```sql
SELECT
count(distinct case when cond1 then col1 end),
count(distinct case when cond2 then col1 end),
...
FROM data
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing test and a new UT in DataFrameAggregateSuite to test 'Vector Size
larger than 64'.
I have written some SQL locally to test the correctness of the distinct
calculation, but it seems difficult to cover most of the cases. Perhaps spark's
existing test set will be more comprehensive, so I didn't leave it in the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]