sperlingxx opened a new pull request #35708:
URL: https://github.com/apache/spark/pull/35708
### What changes were proposed in this pull request?
Current PR is to bring up a new optimizer rule `CombineScalarSubquery` to
combine compatible scalar subqueries.
The basic idea of `CombineScalarSubquery` is combining compatible
aggregate plans served for scalar-subquery as much as possible. Aggregates
sharing the same "base plan" with other aggregates are considered as
compatible. This rule is supposed to combine all groups of compatible
aggregates. And create a unified fused aggregate for each group, which can
serve multiple scalar subqueries. The "base plan" refers to the top non-project
descendant of the Aggregate, which means it could be either the child of
Aggregate or the child of Aggregate(Project(...)).
To be specific, `CombineScalarSubquery` consists of three steps:
1. Collect combination candidates from scalar sub-queries. Assign these
candidates into different groups according to the canonicalized base plan.
2. Combine collected candidates of each group with an unified base plan to
build fused aggregates.
3. Replace ScalarSubqueries with SharedScalarSubqueries based on fused
aggregates.
Below is a sample query to illustrate how `CombineScalarSubquery` works as a
optimizer rule:
```sql
SELECT SUM(i) FROM t
WHERE l > (SELECT MIN(l2) FROM t) AND l2 < (SELECT MAX(l) FROM t)
AND AND i2 <> (SELECT MAX(i2) FROM t) AND i2 <> (SELECT MIN(i2) FROM t)
```
Before the apply of `CombineScalarSubquery`, the optimized logicial plan
looks like:
```
Aggregate [sum(i)]
+- Project [i]
+- Filter (((l > scalar-subquery#1) AND (l2 < scalar-subquery#2)) AND (NOT
(i2 = scalar-subquery#3) AND NOT (i2 = scalar-subquery#4)))
: :- Aggregate [min(l2)]
: : +- Project [l2]
: : +- Relation [l,l2,i,i2]
: +- Aggregate [max(l)]
: +- Project [l]
: +- Relation [l,l2,i,i2]
: +- Aggregate [max(i2)]
: +- Project [l]
: +- Relation [l,l2,i,i2]
: +- Aggregate [min(i2)]
: +- Project [l]
: +- Relation [l,l2,i,i2]
+- Relation [l,l2,i,i2]
```
After the apply of `CombineScalarSubquery`:
```
Aggregate [sum(i)]
+- Project [i]
+- Filter (((l > shared-scalar-subquery#1) AND (l2 <
shared-scalar-subquery#2)) AND (NOT (i2 = shared-scalar-subquery#3) AND NOT (i2
= shared-scalar-subquery#4)))
: :- Aggregate [min(l2),max(l),max(i2),min(i2)]
: : +- Project [l2,l,i2]
: : +- Relation [l,l2,i,i2]
: :- Aggregate [min(l2),max(l),max(i2),min(i2)]
: : +- Project [l2,l,i2]
: +- Relation [l,l2,i,i2]
: :- Aggregate [min(l2),max(l),max(i2),min(i2)]
: : +- Project [l2,l,i2]
: +- Relation [l,l2,i,i2]
: :- Aggregate [min(l2),max(l),max(i2),min(i2)]
: : +- Project [l2,l,i2]
: +- Relation [l,l2,i,i2]
+- Relation [l,l2,i,i2]
```
In this case, all four sub-queries are compatible. Therefore, they can share
the same fused subquery plan. During the preparation of physical plan, these
fuesd aggregates will be detected and reused by the rule
`ReuseExchangeAndSubquery`.
### Why are the changes needed?
To accelerate Spark SQL on queries which are composed of many compatible
scalar subqueries, such as: [TPC-DS
query9](https://github.com/Agirish/tpcds/blob/master/query9.sql)
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
This patch is verified with unit test. Tests cases are added in
`SubquerySuite` and `AdaptiveQueryExec` to make sure thispatch works correctly
in terms of logical plan and physical plan (AQE enabled).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]