beliefer opened a new pull request, #42223:
URL: https://github.com/apache/spark/pull/42223

   ### What changes were proposed in this pull request?
   Some queries contains multiple scalar subquery(aggregation without group by 
clause) and connected with join. The general form of joined aggregates that can 
be merged as follows:
   ```
   <aggregation function> ::=
     SUM | AVG | MAX | ...
   
   <aggregation subquery> ::=
     SELECT
       <aggregation function>(...)[ , <aggregation function>(...)[ , ...]]
     FROM [tab | query]
   
   <joined aggregation > ::=
     SELECT *
     FROM (
       <aggregation subquery> [INNER | CROSS | LEFT | RIGHT | FULL OUTER] JOIN
       <aggregation subquery> [INNER | CROSS | LEFT | RIGHT | FULL OUTER] JOIN
         ...
       <aggregation subquery> [INNER | CROSS | LEFT | RIGHT | FULL OUTER] JOIN
       <aggregation subquery>
     )
   ```
     
   
   For example,
   ```
   SELECT *
   FROM (SELECT
     avg(power) avg_power,
     count(power) count_power,
     count(DISTINCT power) count_distinct_power
   FROM data
   WHERE country = "USA"
     AND (id BETWEEN 1 AND 3
     OR city = "Berkeley"
     OR name = "Xiao")) B1,
     (SELECT
       avg(power) avg_power,
       count(power) count_power,
       count(DISTINCT power) count_distinct_power
     FROM data
     WHERE country = "China"
       AND (id BETWEEN 4 AND 5
       OR city = "Hangzhou"
       OR name = "Wenchen")) B2
   ```
   We can optimize this SQL to the form shown below:
   ```
   SELECT
     avg(power) avg_power FILTER (country = "USA" AND (id BETWEEN 1 AND 3 OR 
city = "Berkeley" OR name = "Xiao")),
     count(power) count_power FILTER (country = "USA" AND (id BETWEEN 1 AND 3 
OR city = "Berkeley" OR name = "Xiao")),
     count(DISTINCT power) FILTER (country = "USA" AND (id BETWEEN 1 AND 3 OR 
city = "Berkeley" OR name = "Xiao")),
     avg(power) FILTER (country = "China" AND (id BETWEEN 4 AND 5 OR city = 
"Hangzhou" OR name = "Wenchen")),
     count(power) FILTER (country = "China" AND (id BETWEEN 4 AND 5 OR city = 
"Hangzhou" OR name = "Wenchen")),
     count(DISTINCT power) FILTER (country = "China" AND (id BETWEEN 4 AND 5 OR 
city = "Hangzhou" OR name = "Wenchen"))
   FROM data
   WHERE
   (country = "USA"
     AND (id BETWEEN 1 AND 3
     OR city = "Berkeley"
     OR name = "Xiao")) OR
   (country = "China"
       AND (id BETWEEN 4 AND 5
       OR city = "Hangzhou"
       OR name = "Wenchen"))
   ```
   If we can merge the filters and aggregates, we can scan data source only 
once and eliminate the join so as avoid shuffle.
   
   This PR also supports eliminate nested Join, please refer to: 
https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q28.sql
   
   Obviously, this change will improve the performance.
   
   This PR also reuse some functions come from `MergeScalarSubqueries`.
   
   This PR also add some `TreePattern` for easy to check the cost of predicate.
   
   ### Why are the changes needed?
   Improve the performance for the case show above.
   
   
   ### Does this PR introduce _any_ user-facing change?
   'No'.
   New feature.
   
   
   ### How was this patch tested?
   1. new test cases
   2. new micro benchmark.
   
   ```
   Benchmark CombineJoinedAggregates:                     Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   
-------------------------------------------------------------------------------------------------------------------------------------
   filter is not defined, CombineJoinedAggregates: false            730         
   819          69         28.7          34.8       1.0X
   filter is not defined, CombineJoinedAggregates: true             618         
   632          14         33.9          29.5       1.2X
   step is 1000000, CombineJoinedAggregates: false                  572         
   590          20         36.7          27.3       1.3X
   step is 1000000, CombineJoinedAggregates: true)                  769         
   794          21         27.3          36.6       1.0X
   step is 100000, CombineJoinedAggregates: false                   350         
   370          26         59.9          16.7       2.1X
   step is 100000, CombineJoinedAggregates: true)                   231         
   241          10         90.7          11.0       3.2X
   step is 10000, CombineJoinedAggregates: false                    314         
   340          26         66.8          15.0       2.3X
   step is 10000, CombineJoinedAggregates: true)                    171         
   182           9        122.5           8.2       4.3X
   step is 1000, CombineJoinedAggregates: false                     303         
   337          32         69.3          14.4       2.4X
   step is 1000, CombineJoinedAggregates: true)                     162         
   171           9        129.4           7.7       4.5X
   step is 100, CombineJoinedAggregates: false                      300         
   316          27         70.0          14.3       2.4X
   step is 100, CombineJoinedAggregates: true)                      160         
   169           9        131.3           7.6       4.6X
   step is 10, CombineJoinedAggregates: false                       297         
   325          33         70.6          14.2       2.5X
   step is 10, CombineJoinedAggregates: true)                       170         
   203          36        123.5           8.1       4.3X
   step is 1, CombineJoinedAggregates: false                        328         
   352          17         64.0          15.6       2.2X
   step is 1, CombineJoinedAggregates: true)                        140         
   148           7        149.3           6.7       5.2X
   ```
   ```
   Benchmark CombineJoinedAggregates:                     Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   
-------------------------------------------------------------------------------------------------------------------------------------
   Tree node number < 1, CombineJoinedAggregates: true)             398         
   503         109         52.7          19.0       1.0X
   Tree node number < 9, CombineJoinedAggregates: true)             394         
   432          31         53.2          18.8       1.0X
   Tree node number < 19, CombineJoinedAggregates: true)            399         
   427          47         52.6          19.0       1.0X
   Tree node number < 29, CombineJoinedAggregates: true)            434         
   479         100         48.3          20.7       0.9X
   Tree node number < 39, CombineJoinedAggregates: true)            480         
   499          24         43.7          22.9       0.8X
   ```
   
   3. manual test on TPC-DS
   TPC-DS data size: 2TB.
   This improvement is valid for TPC-DS q28 and no regression for other test 
cases.
   
   | TPC-DS Query   | Before(Seconds)  | After(Seconds)  | Speedup(Percent)  |
   |  ----  | ----  | ----  | ----  |
   | q28 | 109.665 | 43.938 | 249.59% |
   
   According to the micro benchmark, this improvement is worse than before if 
the filter has almost no selectivity.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to