beliefer opened a new pull request, #42223:
URL: https://github.com/apache/spark/pull/42223
### What changes were proposed in this pull request?
Some queries contains multiple scalar subquery(aggregation without group by
clause) and connected with join. The general form of joined aggregates that can
be merged as follows:
```
<aggregation function> ::=
SUM | AVG | MAX | ...
<aggregation subquery> ::=
SELECT
<aggregation function>(...)[ , <aggregation function>(...)[ , ...]]
FROM [tab | query]
<joined aggregation > ::=
SELECT *
FROM (
<aggregation subquery> [INNER | CROSS | LEFT | RIGHT | FULL OUTER] JOIN
<aggregation subquery> [INNER | CROSS | LEFT | RIGHT | FULL OUTER] JOIN
...
<aggregation subquery> [INNER | CROSS | LEFT | RIGHT | FULL OUTER] JOIN
<aggregation subquery>
)
```
For example,
```
SELECT *
FROM (SELECT
avg(power) avg_power,
count(power) count_power,
count(DISTINCT power) count_distinct_power
FROM data
WHERE country = "USA"
AND (id BETWEEN 1 AND 3
OR city = "Berkeley"
OR name = "Xiao")) B1,
(SELECT
avg(power) avg_power,
count(power) count_power,
count(DISTINCT power) count_distinct_power
FROM data
WHERE country = "China"
AND (id BETWEEN 4 AND 5
OR city = "Hangzhou"
OR name = "Wenchen")) B2
```
We can optimize this SQL to the form shown below:
```
SELECT
avg(power) avg_power FILTER (country = "USA" AND (id BETWEEN 1 AND 3 OR
city = "Berkeley" OR name = "Xiao")),
count(power) count_power FILTER (country = "USA" AND (id BETWEEN 1 AND 3
OR city = "Berkeley" OR name = "Xiao")),
count(DISTINCT power) FILTER (country = "USA" AND (id BETWEEN 1 AND 3 OR
city = "Berkeley" OR name = "Xiao")),
avg(power) FILTER (country = "China" AND (id BETWEEN 4 AND 5 OR city =
"Hangzhou" OR name = "Wenchen")),
count(power) FILTER (country = "China" AND (id BETWEEN 4 AND 5 OR city =
"Hangzhou" OR name = "Wenchen")),
count(DISTINCT power) FILTER (country = "China" AND (id BETWEEN 4 AND 5 OR
city = "Hangzhou" OR name = "Wenchen"))
FROM data
WHERE
(country = "USA"
AND (id BETWEEN 1 AND 3
OR city = "Berkeley"
OR name = "Xiao")) OR
(country = "China"
AND (id BETWEEN 4 AND 5
OR city = "Hangzhou"
OR name = "Wenchen"))
```
If we can merge the filters and aggregates, we can scan data source only
once and eliminate the join so as avoid shuffle.
This PR also supports eliminate nested Join, please refer to:
https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q28.sql
Obviously, this change will improve the performance.
This PR also reuse some functions come from `MergeScalarSubqueries`.
This PR also add some `TreePattern` for easy to check the cost of predicate.
### Why are the changes needed?
Improve the performance for the case show above.
### Does this PR introduce _any_ user-facing change?
'No'.
New feature.
### How was this patch tested?
1. new test cases
2. new micro benchmark.
```
Benchmark CombineJoinedAggregates: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------------
filter is not defined, CombineJoinedAggregates: false 730
819 69 28.7 34.8 1.0X
filter is not defined, CombineJoinedAggregates: true 618
632 14 33.9 29.5 1.2X
step is 1000000, CombineJoinedAggregates: false 572
590 20 36.7 27.3 1.3X
step is 1000000, CombineJoinedAggregates: true) 769
794 21 27.3 36.6 1.0X
step is 100000, CombineJoinedAggregates: false 350
370 26 59.9 16.7 2.1X
step is 100000, CombineJoinedAggregates: true) 231
241 10 90.7 11.0 3.2X
step is 10000, CombineJoinedAggregates: false 314
340 26 66.8 15.0 2.3X
step is 10000, CombineJoinedAggregates: true) 171
182 9 122.5 8.2 4.3X
step is 1000, CombineJoinedAggregates: false 303
337 32 69.3 14.4 2.4X
step is 1000, CombineJoinedAggregates: true) 162
171 9 129.4 7.7 4.5X
step is 100, CombineJoinedAggregates: false 300
316 27 70.0 14.3 2.4X
step is 100, CombineJoinedAggregates: true) 160
169 9 131.3 7.6 4.6X
step is 10, CombineJoinedAggregates: false 297
325 33 70.6 14.2 2.5X
step is 10, CombineJoinedAggregates: true) 170
203 36 123.5 8.1 4.3X
step is 1, CombineJoinedAggregates: false 328
352 17 64.0 15.6 2.2X
step is 1, CombineJoinedAggregates: true) 140
148 7 149.3 6.7 5.2X
```
```
Benchmark CombineJoinedAggregates: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------------
Tree node number < 1, CombineJoinedAggregates: true) 398
503 109 52.7 19.0 1.0X
Tree node number < 9, CombineJoinedAggregates: true) 394
432 31 53.2 18.8 1.0X
Tree node number < 19, CombineJoinedAggregates: true) 399
427 47 52.6 19.0 1.0X
Tree node number < 29, CombineJoinedAggregates: true) 434
479 100 48.3 20.7 0.9X
Tree node number < 39, CombineJoinedAggregates: true) 480
499 24 43.7 22.9 0.8X
```
3. manual test on TPC-DS
TPC-DS data size: 2TB.
This improvement is valid for TPC-DS q28 and no regression for other test
cases.
| TPC-DS Query | Before(Seconds) | After(Seconds) | Speedup(Percent) |
| ---- | ---- | ---- | ---- |
| q28 | 109.665 | 43.938 | 249.59% |
According to the micro benchmark, this improvement is worse than before if
the filter has almost no selectivity.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]