peter-toth opened a new pull request, #53019:
URL: https://github.com/apache/spark/pull/53019
### What changes were proposed in this pull request?
This PR renames `MergeScalarSubqueries` rule to `MergeSubplans` and extends
plan merging capabilities to non-grouping aggregate subplans, which are very
similar to scalar subqueries in terms they return one row result.
Consider the following query that joins 2 non-grouping aggregates:
```
Join Inner
:- Aggregate [min(a) AS min(a)]
: +- Relation [a, b, c]
+- Aggregate [sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
+- Relation [a, b, c]
```
with the improved rule the plan is optimized to:
```
WithCTE
:- CTERelationDef 0
: +- Project [named_struct(min(a), min(a), sum(b), sum(b), avg(c), avg(c))
AS mergedValue]
: +- Aggregate [min(a) AS min(a), sum(b) AS sum(b), avg(cast(c as
double)) AS avg(c)]
: +- Relation [a, b, c]
+- Join Inner
:- Project [scalar-subquery [].min(a) AS min(a)]
: : +- CTERelationRef 0
: +- OneRowRelation
+- Project [scalar-subquery [].sum(b) AS sum(b), scalar-subquery
[].avg(c) AS avg(c)]
: :- CTERelationRef 0
: +- CTERelationRef 0
+- OneRowRelation
```
so as to scan `Relation` only once.
Please note that the above plan where the 2 aggregations are part of an
"inner join group" could be rewritten as one aggregate without the need to
introduce a CTE and keeping the join. But there are more complex cases when the
proposed CTE based approach is the only viable option. Such cases include when
the aggregates reside at different parts of plan, maybe even in diffrent
subquery expressions.
E.g. the following query:
```
Join Inner
:- Project [scalar-subquery [] AS scalarsubquery()]
: : +- Aggregate [min(a) AS min(a)]
: : +- Relation [a, b, c]
: +- OneRowRelation
+- Aggregate [sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
+- Relation [a, b, c]
```
can be optimized to:
```
WithCTE
:- CTERelationDef 0, true
: +- Project [named_struct(min(a), min(a), sum(b), sum(b), avg(c), avg(c))
AS mergedValue]
: +- Aggregate [min(a) AS min(a), sum(b) AS sum(b), avg(cast(c as
double)) AS avg(c)]
: +- Relation [a, b, c]
+- Join Inner
:- Project [scalar-subquery [].min(a) AS scalarsubquery()]
: : +- CTERelationRef 0
: +- OneRowRelation
+- Project [scalar-subquery [].sum(b) AS sum(b), scalar-subquery
[].avg(c) AS avg(c)]
: :- CTERelationRef 0
: +- CTERelationRef 0
+- OneRowRelation
```
### Why are the changes needed?
To improve plan merging logic to further reduce redundant IO.
Please also note that TPCDS q28 and q88 contain non-grouping aggregated, but
this PR can't deal with them yet. Those queries will improve once
[SPARK-40193](https://issues.apache.org/jira/browse/SPARK-40193) /
https://github.com/apache/spark/pull/37630 lands in Spark.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
Existing and new UTs.
### Was this patch authored or co-authored using generative AI tooling?
No.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]