beliefer opened a new pull request, #38799:
URL: https://github.com/apache/spark/pull/38799
### What changes were proposed in this pull request?
introduce a new node `WindowGroupLimit` to filter out unnecessary rows based
on rank computed on partial dataset.
it supports following pattern:
```
SELECT (... (row_number|rank|dense_rank)()
OVER (
PARTITION BY ...
ORDER BY ... ) AS rn)
WHERE rn (==|<|<=) k
AND other conditions
```
For these three rank-like functions (row_number|rank|dense_rank), the rank
of a key computed on partial dataset always <= its final rank computed on the
whole dataset,so we can safely discard rows with partial rank > k, anywhere.
This PR also take over some functions from
https://github.com/apache/spark/pull/34367.
### Why are the changes needed?
1. reduce the shuffle write
2. solve skewed-window problem, a practical case was optimized from 2.5h to
26min.
3. improve the performance and TPC-DS.
**Micro Benchmark**
TPC-DS data size: 2TB.
This improvement is valid for TPC-DS q67 and no regression for other test
cases.
### Does this PR introduce _any_ user-facing change?
'No'.
Just update the inner implementation and add a new config.
### How was this patch tested?
1. new test suites
2. new micro benchmark
```
[info] Benchmark Top-K: Best
Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info]
-----------------------------------------------------------------------------------------------------------------------------------------------
[info] ROW_NUMBER (PARTITION: , WindowGroupLimit: false)
13036 15052 969 1.6 621.6 1.0X
[info] ROW_NUMBER (PARTITION: , WindowGroupLimit: true)
4269 4650 303 4.9 203.6 3.1X
[info] ROW_NUMBER (PARTITION: PARTITION BY b, WindowGroupLimit: false)
24159 25238 919 0.9 1152.0 0.5X
[info] ROW_NUMBER (PARTITION: PARTITION BY b, WindowGroupLimit: true)
6466 6594 104 3.2 308.3 2.0X
[info] RANK (PARTITION: , WindowGroupLimit: false)
11291 11691 252 1.9 538.4 1.2X
[info] RANK (PARTITION: , WindowGroupLimit: true)
3376 3709 218 6.2 161.0 3.9X
[info] RANK (PARTITION: PARTITION BY b, WindowGroupLimit: false)
24778 24927 69 0.8 1181.5 0.5X
[info] RANK (PARTITION: PARTITION BY b, WindowGroupLimit: true)
6531 6613 68 3.2 311.4 2.0X
[info] DENSE_RANK (PARTITION: , WindowGroupLimit: false)
11468 11730 142 1.8 546.8 1.1X
[info] DENSE_RANK (PARTITION: , WindowGroupLimit: true)
3459 3658 201 6.1 164.9 3.8X
[info] DENSE_RANK (PARTITION: PARTITION BY b, WindowGroupLimit: false)
24809 24961 173 0.8 1183.0 0.5X
[info] DENSE_RANK (PARTITION: PARTITION BY b, WindowGroupLimit: true)
6512 6579 44 3.2 310.5 2.0X
```
3. manual test on TPC-DS
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]