beliefer opened a new pull request, #38799:
URL: https://github.com/apache/spark/pull/38799

   ### What changes were proposed in this pull request?
   introduce a new node `WindowGroupLimit` to filter out unnecessary rows based 
on rank computed on partial dataset.
   
   it supports following pattern:
   ```
   SELECT (... (row_number|rank|dense_rank)()
       OVER (
   PARTITION BY ...
   ORDER BY  ... ) AS rn)
   WHERE rn (==|<|<=) k
           AND other conditions
   ```
   
   For these three rank-like functions (row_number|rank|dense_rank), the rank 
of a key computed on partial dataset always <= its final rank computed on the 
whole dataset,so we can safely discard rows with partial rank > k, anywhere.
   
   This PR also take over some functions from 
https://github.com/apache/spark/pull/34367.
   
   
   ### Why are the changes needed?
   
   1. reduce the shuffle write
   2. solve skewed-window problem, a practical case was optimized from 2.5h to 
26min.
   3. improve the performance and TPC-DS.
   
   **Micro Benchmark**
   TPC-DS data size: 2TB.
   This improvement is valid for TPC-DS q67 and no regression for other test 
cases.
   
   ### Does this PR introduce _any_ user-facing change?
   'No'.
   Just update the inner implementation and add a new config.
   
   
   ### How was this patch tested?
   
   1. new test suites
   2. new micro benchmark
   ```
   [info] Benchmark Top-K:                                                 Best 
Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   [info] 
-----------------------------------------------------------------------------------------------------------------------------------------------
   [info] ROW_NUMBER (PARTITION: , WindowGroupLimit: false)                     
   13036          15052         969          1.6         621.6       1.0X
   [info] ROW_NUMBER (PARTITION: , WindowGroupLimit: true)                      
    4269           4650         303          4.9         203.6       3.1X
   [info] ROW_NUMBER (PARTITION: PARTITION BY b, WindowGroupLimit: false)       
   24159          25238         919          0.9        1152.0       0.5X
   [info] ROW_NUMBER (PARTITION: PARTITION BY b, WindowGroupLimit: true)        
    6466           6594         104          3.2         308.3       2.0X
   [info] RANK (PARTITION: , WindowGroupLimit: false)                           
   11291          11691         252          1.9         538.4       1.2X
   [info] RANK (PARTITION: , WindowGroupLimit: true)                            
    3376           3709         218          6.2         161.0       3.9X
   [info] RANK (PARTITION: PARTITION BY b, WindowGroupLimit: false)             
   24778          24927          69          0.8        1181.5       0.5X
   [info] RANK (PARTITION: PARTITION BY b, WindowGroupLimit: true)              
    6531           6613          68          3.2         311.4       2.0X
   [info] DENSE_RANK (PARTITION: , WindowGroupLimit: false)                     
   11468          11730         142          1.8         546.8       1.1X
   [info] DENSE_RANK (PARTITION: , WindowGroupLimit: true)                      
    3459           3658         201          6.1         164.9       3.8X
   [info] DENSE_RANK (PARTITION: PARTITION BY b, WindowGroupLimit: false)       
   24809          24961         173          0.8        1183.0       0.5X
   [info] DENSE_RANK (PARTITION: PARTITION BY b, WindowGroupLimit: true)        
    6512           6579          44          3.2         310.5       2.0X
   ```
   3. manual test on TPC-DS
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to