beliefer opened a new pull request, #38388: URL: https://github.com/apache/spark/pull/38388
### What changes were proposed in this pull request? Currently, if the creation side of runtime filter could be broadcasted, Spark will not inject a bloom filter or `InSunquery` filter into the application side. I think at first we thought broadcast was cheaper than submit a new job for runtime filter. This behavior is contrary to the join has a shuffle below broadcast hash join described in the design document.  In fact, we just need the creation side is small enough and inject bloom filter which could reuse the broadcast exchange and improve performance. As we know, bloom filter ensures creation side is small enough with the code below. ``` // Skip if the filter creation side is too big if (filterCreationSidePlan.stats.sizeInBytes > conf.runtimeFilterCreationSideThreshold) { return filterApplicationSidePlan } ``` `InSunquery` filter ensures creation side is small enough with the code below. ``` val aggregate = Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan) if (!canBroadcastBySize(aggregate, conf)) { // Skip the InSubquery filter if the size of `aggregate` is beyond broadcast join threshold, // i.e., the semi-join will be a shuffled join, which is not worthwhile. return filterApplicationSidePlan } ``` After this PR, we can support below scenes.  **Query Performance Benchmarks: TPC-DS Performance Evaluation** Here are the test cases this PR can trigger the runtime filter. This PR is useful for large queries. The tpcds data size is 3TB. Open AQE(Default) | TPC-DS Query | Default(Seconds) | After(Seconds) | Speedup(Percent) | | ---- | ---- | ---- | ---- | | q14a | 169.103 | 166.837 | 1% | | q14b | 162.32 | 162.40 | 0% | | q23a | 169.103 | 166.837 | 1% | | q23b | 596.88 | 596.65 | 0% | | q24a | 430.36 | 427.85 | 1% | | q24b | 427.37 | 407.36 | 5% | | q50 | 179.97 | 171.96 | 5% | | q64 | 195.88 | 196.83 | 0% | | q67 | 1,244.59 | 1,172.70 | 6% | | q75 | 102.48 | 109.66 | -7% | | q78 | 236.50 | 236.15 | 0% | | q93 | 338.32 | 328.83 | 3% | | q95 | 192.69 | 186.25 | 3% | Close AQE     ### Why are the changes needed? 1. Relax the restrictions of broadcast join on bloom filter, so as the runtime filter applicable to more scenarios. 2. Reuse the broadcast exchange for bloom filter. ### Does this PR introduce _any_ user-facing change? 'No'. Just update the inner implementation. ### How was this patch tested? New tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
