beliefer opened a new pull request, #38388:
URL: https://github.com/apache/spark/pull/38388

   ### What changes were proposed in this pull request?
   Currently, if the creation side of runtime filter could be broadcasted, 
Spark will not inject a bloom filter or `InSunquery` filter into the 
application side. I think at first we thought broadcast was cheaper than submit 
a new job for runtime filter. This behavior is contrary to the join has a 
shuffle below broadcast hash join described in the design document.
   
![image](https://user-images.githubusercontent.com/8486025/194822331-7a36b018-dfd9-48ce-a867-7c30a6914791.png)
   
   In fact, we just need the creation side is small enough and inject bloom 
filter which could reuse the broadcast exchange and improve performance.
   
   As we know, bloom filter ensures creation side is small enough with the code 
below.
   ```
       // Skip if the filter creation side is too big
       if (filterCreationSidePlan.stats.sizeInBytes > 
conf.runtimeFilterCreationSideThreshold) {
         return filterApplicationSidePlan
       }
   ```
   `InSunquery` filter ensures creation side is small enough with the code 
below.
   ```
       val aggregate = Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan)
       if (!canBroadcastBySize(aggregate, conf)) {
         // Skip the InSubquery filter if the size of `aggregate` is beyond 
broadcast join threshold,
         // i.e., the semi-join will be a shuffled join, which is not 
worthwhile.
         return filterApplicationSidePlan
       }
   ```
   
   After this PR, we can support below scenes.
   
![image](https://user-images.githubusercontent.com/8486025/212868325-154d21ee-cfc1-4e6c-a27a-a4fd3908e029.png)
   
   
   **Query Performance Benchmarks: TPC-DS Performance Evaluation**
   Here are the test cases this PR can trigger the runtime filter. This PR is 
useful for large queries.
   The tpcds data size is 3TB.
   Open AQE(Default)
   | TPC-DS Query   | Default(Seconds)  | After(Seconds)  | Speedup(Percent)  |
   |  ----  | ----  | ----  | ----  |
   | q14a | 169.103 | 166.837 | 1% |
   | q14b | 162.32 | 162.40 | 0% |
   | q23a | 169.103 | 166.837 | 1% |
   | q23b | 596.88 | 596.65 | 0% |
   | q24a | 430.36 | 427.85 | 1% |
   | q24b | 427.37 | 407.36 | 5% |
   | q50 | 179.97 | 171.96 | 5% |
   | q64 | 195.88 | 196.83 | 0% |
   | q67 | 1,244.59 | 1,172.70 | 6% |
   | q75 | 102.48 | 109.66 | -7% |
   | q78 | 236.50 | 236.15 | 0% |
   | q93 | 338.32 | 328.83 | 3% |
   | q95 | 192.69 | 186.25 | 3% |
   
   Close AQE
   
![image](https://user-images.githubusercontent.com/8486025/198534786-e3078406-586d-4eb6-a4a8-8faea8b12a59.png)
   
![image](https://user-images.githubusercontent.com/8486025/198534915-2bd8dff8-3f9a-434e-8b13-ed23ef392de9.png)
   
![image](https://user-images.githubusercontent.com/8486025/198535019-4fcd4ef8-ca08-4024-90be-ab634d1a1eca.png)
   
![image](https://user-images.githubusercontent.com/8486025/198535089-d7b09066-3582-4bfe-977c-26088ffe425a.png)
   
   
   
   ### Why are the changes needed?
   
   1. Relax the restrictions of broadcast join on bloom filter, so as the 
runtime filter applicable to more scenarios.
   2. Reuse the broadcast exchange for bloom filter.
   
   
   ### Does this PR introduce _any_ user-facing change?
   'No'.
   Just update the inner implementation.
   
   
   ### How was this patch tested?
   New tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to