prakharjain09 opened a new pull request #30426:
URL: https://github.com/apache/spark/pull/30426


   ### What changes were proposed in this pull request?
   This PR tries to reduce the number of physical aggregation nodes by 
collapsing the PARTIAL and the FINAL aggregation nodes together when there is 
no Exchange between them. 
   
   Example - consider the following query:
   
       SELECT sum(t2.col1), max(t2.col2), t1.col1, t1.col2
       FROM t1, t2
       WHERE t1.col1 = t2.col1
       GROUP BY t1.col1, t1.col2
   
   Current plan:
   
         == Physical Plan ==
         *(5) HashAggregate(keys=[col1#7, col2#8], functions=[sum(cast(col1#18 
as bigint)), max(col2#19)], output=[sum(col1)#140L, max(col2)#141, col1#7, 
col2#8])
         +- *(5) HashAggregate(keys=[col1#7, col2#8], 
functions=[partial_sum(cast(col1#18 as bigint)), partial_max(col2#19)], 
output=[col1#7, col2#8, sum#148L, max#149])
            +- *(5) SortMergeJoin [col1#7], [col1#18], Inner
               :- *(2) Sort [col1#7 ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(col1#7, 5), true, [id=#644]
               :     +- *(1) Project [value#2 AS col1#7, (value#2 % 10) AS 
col2#8]
               :        +- *(1) SerializeFromObject [input[0, int, false] AS 
value#2]
               :           +- Scan[obj#1]
               +- *(4) Sort [col1#18 ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(col1#18, 5), true, [id=#653]
                     +- *(3) Project [value#13 AS col1#18, (value#13 % 10) AS 
col2#19]
                        +- *(3) SerializeFromObject [input[0, int, false] AS 
value#13]
                           +- Scan[obj#12]
   
   
   The above plan can be optimized to following:
   
         == Physical Plan ==
         *(5) HashAggregate(keys=[col1#7, col2#8], functions=[sum(cast(col1#18 
as bigint)), max(col2#19)], output=[sum(col1)#157L, max(col2)#158, col1#7, 
col2#8])
         +- *(5) SortMergeJoin [col1#7], [col1#18], Inner
            :- *(2) Sort [col1#7 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(col1#7, 5), true, [id=#727]
            :     +- *(1) Project [value#2 AS col1#7, (value#2 % 10) AS col2#8]
            :        +- *(1) SerializeFromObject [input[0, int, false] AS 
value#2]
            :           +- Scan[obj#1]
            +- *(4) Sort [col1#18 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(col1#18, 5), true, [id=#736]
                  +- *(3) Project [value#13 AS col1#18, (value#13 % 10) AS 
col2#19]
                     +- *(3) SerializeFromObject [input[0, int, false] AS 
value#13]
                        +- Scan[obj#12]
   
   ### Why are the changes needed?
   This change removed the unrequired Aggregation node and so will help in 
improving performance.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Added UTs.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to