prakharjain09 opened a new pull request #30426:
URL: https://github.com/apache/spark/pull/30426
### What changes were proposed in this pull request?
This PR tries to reduce the number of physical aggregation nodes by
collapsing the PARTIAL and the FINAL aggregation nodes together when there is
no Exchange between them.
Example - consider the following query:
SELECT sum(t2.col1), max(t2.col2), t1.col1, t1.col2
FROM t1, t2
WHERE t1.col1 = t2.col1
GROUP BY t1.col1, t1.col2
Current plan:
== Physical Plan ==
*(5) HashAggregate(keys=[col1#7, col2#8], functions=[sum(cast(col1#18
as bigint)), max(col2#19)], output=[sum(col1)#140L, max(col2)#141, col1#7,
col2#8])
+- *(5) HashAggregate(keys=[col1#7, col2#8],
functions=[partial_sum(cast(col1#18 as bigint)), partial_max(col2#19)],
output=[col1#7, col2#8, sum#148L, max#149])
+- *(5) SortMergeJoin [col1#7], [col1#18], Inner
:- *(2) Sort [col1#7 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col1#7, 5), true, [id=#644]
: +- *(1) Project [value#2 AS col1#7, (value#2 % 10) AS
col2#8]
: +- *(1) SerializeFromObject [input[0, int, false] AS
value#2]
: +- Scan[obj#1]
+- *(4) Sort [col1#18 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#18, 5), true, [id=#653]
+- *(3) Project [value#13 AS col1#18, (value#13 % 10) AS
col2#19]
+- *(3) SerializeFromObject [input[0, int, false] AS
value#13]
+- Scan[obj#12]
The above plan can be optimized to following:
== Physical Plan ==
*(5) HashAggregate(keys=[col1#7, col2#8], functions=[sum(cast(col1#18
as bigint)), max(col2#19)], output=[sum(col1)#157L, max(col2)#158, col1#7,
col2#8])
+- *(5) SortMergeJoin [col1#7], [col1#18], Inner
:- *(2) Sort [col1#7 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col1#7, 5), true, [id=#727]
: +- *(1) Project [value#2 AS col1#7, (value#2 % 10) AS col2#8]
: +- *(1) SerializeFromObject [input[0, int, false] AS
value#2]
: +- Scan[obj#1]
+- *(4) Sort [col1#18 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#18, 5), true, [id=#736]
+- *(3) Project [value#13 AS col1#18, (value#13 % 10) AS
col2#19]
+- *(3) SerializeFromObject [input[0, int, false] AS
value#13]
+- Scan[obj#12]
### Why are the changes needed?
This change removed the unrequired Aggregation node and so will help in
improving performance.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added UTs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]