This is an automated email from the ASF dual-hosted git repository. beliefer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1d8df4f6b99b [SPARK-45606][SQL] Release restrictions on multi-layer runtime filter 1d8df4f6b99b is described below commit 1d8df4f6b99b836f4267b888e81d67c75b4dfdcd Author: Jiaan Geng <belie...@163.com> AuthorDate: Wed Nov 8 19:43:33 2023 +0800 [SPARK-45606][SQL] Release restrictions on multi-layer runtime filter ### What changes were proposed in this pull request? Before https://github.com/apache/spark/pull/39170, Spark only supports insert runtime filter for application side of shuffle join on single-layer. Considered it's not worth to insert more runtime filter if the column already exists runtime filter, Spark restricts it at https://github.com/apache/spark/blob/7057952f6bc2c5cf97dd408effd1b18bee1cb8f4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala#L346 For example `select * from bf1 join bf2 on bf1.c1 = bf2.c2 and bf1.c1 = bf2.b2 where bf2.a2 = 62` This SQL have two join conditions. There will insert two runtime filter on `bf1.c1` if haven't the restriction mentioned above. At that time, it was reasonable. After https://github.com/apache/spark/pull/39170, Spark supports insert runtime filter for one side of any shuffle join on multi-layer. But the restrictions on multi-layer runtime filter mentioned above looks outdated. For example `select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5` Assume bf2 as the build side and insert a runtime filter for bf1. We can't insert the same runtime filter for bf3 due to there are already a runtime filter on `bf1.c1`. The behavior is different from the origin and is unexpected. The change of the PR doesn't affect the restriction mentioned above. ### Why are the changes needed? Release restrictions on multi-layer runtime filter. Expand optimization surface. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Test cases updated. Micro benchmark for q9 in TPC-H. **TPC-H 100** Query | Master(ms) | PR(ms) | Difference(ms) | Percent -- | -- | -- | -- | -- q9 | 26491 | 20725 | 5766| 27.82% ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #43449 from beliefer/SPARK-45606. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Jiaan Geng <belie...@163.com> --- .../catalyst/optimizer/InjectRuntimeFilter.scala | 33 ++++++++++------------ .../spark/sql/InjectRuntimeFilterSuite.scala | 8 ++---- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala index 5f5508d6b22c..9c150f1f3308 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala @@ -247,15 +247,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J } } - private def hasBloomFilter( - left: LogicalPlan, - right: LogicalPlan, - leftKey: Expression, - rightKey: Expression): Boolean = { - findBloomFilterWithKey(left, leftKey) || findBloomFilterWithKey(right, rightKey) - } - - private def findBloomFilterWithKey(plan: LogicalPlan, key: Expression): Boolean = { + private def hasBloomFilter(plan: LogicalPlan, key: Expression): Boolean = { plan.exists { case Filter(condition, _) => splitConjunctivePredicates(condition).exists { @@ -277,28 +269,33 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J leftKeys.lazyZip(rightKeys).foreach((l, r) => { // Check if: // 1. There is already a DPP filter on the key - // 2. There is already a bloom filter on the key - // 3. The keys are simple cheap expressions + // 2. The keys are simple cheap expressions if (filterCounter < numFilterThreshold && !hasDynamicPruningSubquery(left, right, l, r) && - !hasBloomFilter(newLeft, newRight, l, r) && isSimpleExpression(l) && isSimpleExpression(r)) { val oldLeft = newLeft val oldRight = newRight - // Check if the current join is a shuffle join or a broadcast join that - // has a shuffle below it + // Check if: + // 1. The current join type supports prune the left side with runtime filter + // 2. The current join is a shuffle join or a broadcast join that + // has a shuffle below it + // 3. There is no bloom filter on the left key yet val hasShuffle = isProbablyShuffleJoin(left, right, hint) - if (canPruneLeft(joinType) && (hasShuffle || probablyHasShuffle(left))) { + if (canPruneLeft(joinType) && (hasShuffle || probablyHasShuffle(left)) && + !hasBloomFilter(newLeft, l)) { extractBeneficialFilterCreatePlan(left, right, l, r).foreach { case (filterCreationSideKey, filterCreationSidePlan) => newLeft = injectFilter(l, newLeft, filterCreationSideKey, filterCreationSidePlan) } } // Did we actually inject on the left? If not, try on the right - // Check if the current join is a shuffle join or a broadcast join that - // has a shuffle below it + // Check if: + // 1. The current join type supports prune the right side with runtime filter + // 2. The current join is a shuffle join or a broadcast join that + // has a shuffle below it + // 3. There is no bloom filter on the right key yet if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) && - (hasShuffle || probablyHasShuffle(right))) { + (hasShuffle || probablyHasShuffle(right)) && !hasBloomFilter(newRight, r)) { extractBeneficialFilterCreatePlan(right, left, r, l).foreach { case (filterCreationSideKey, filterCreationSidePlan) => newRight = injectFilter( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala index 2e57975ee6d1..fc1524be1317 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala @@ -335,14 +335,12 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp "bf1.c1 = bf2.c2 where bf2.a2 = 5) as a join bf3 on bf3.c3 = a.c1", 2) assertRewroteWithBloomFilter("select * from (select * from bf1 right join bf2 on " + "bf1.c1 = bf2.c2 where bf2.a2 = 5) as a join bf3 on bf3.c3 = a.c1", 2) - // Can't leverage the transitivity of join keys due to runtime filters already exists. - // bf2 as creation side and inject runtime filter for bf1. assertRewroteWithBloomFilter("select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 " + - "and bf3.c3 = bf1.c1 where bf2.a2 = 5") + "and bf3.c3 = bf1.c1 where bf2.a2 = 5", 2) assertRewroteWithBloomFilter("select * from bf1 left outer join bf2 join bf3 on " + - "bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5") + "bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5", 2) assertRewroteWithBloomFilter("select * from bf1 right outer join bf2 join bf3 on " + - "bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5") + "bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5", 2) } withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org