wangyum opened a new pull request #36005:
URL: https://github.com/apache/spark/pull/36005


   ### What changes were proposed in this pull request?
   
   This PR add a new rule to push down the partial aggregation through join if 
it cannot be planned as broadcast hash join. For example:
   ```
   set spark.sql.autoBroadcastJoinThreshold=-1;
   CREATE TABLE t1 using parquet AS SELECT id AS a, id AS b, id AS c FROM 
range(10);
   CREATE TABLE t2 using parquet AS SELECT id AS x, id AS y FROM range(8);
   SELECT c, y FROM t1 JOIN t2 ON t1.a = t2.x GROUP BY c, y;
   ```
   
   Before this pr:
   ```
   == Optimized Logical Plan ==
   Aggregate [c#19L, y#21L], [c#19L, y#21L], false
   +- Project [c#19L, y#21L]
      +- Join Inner, (a#17L = x#20L)
         :- Project [a#17L, c#19L]
         :  +- Filter isnotnull(a#17L)
         :     +- Relation default.t1[a#17L,b#18L,c#19L] parquet
         +- Filter isnotnull(x#20L)
            +- Relation default.t2[x#20L,y#21L] parquet
   ```
   
   After this pr:
   ```
   == Optimized Logical Plan ==
   Aggregate [c#19L, y#21L], [c#19L, y#21L], false
   +- Project [c#19L, y#21L]
      +- Join Inner, (a#17L = x#20L)
         :- Aggregate [c#19L, a#17L], [c#19L, a#17L], true
         :  +- Project [a#17L, c#19L]
         :     +- Filter isnotnull(a#17L)
         :        +- Relation default.t1[a#17L,b#18L,c#19L] parquet
         +- Aggregate [y#21L, x#20L], [y#21L, x#20L], true
            +- Filter isnotnull(x#20L)
               +- Relation default.t2[x#20L,y#21L] parquet
   ```
   
   ### Why are the changes needed?
   
   1. Reduce shuffle data to improve query performance.
   2. Many databases have similar rules:
       Teradata: 
https://docs.teradata.com/r/Teradata-VantageTM-SQL-Request-and-Transaction-Processing/March-2019/Join-Planning-and-Optimization/Partial-GROUP-BY-Block-Optimization
       Calcite: https://issues.apache.org/jira/browse/CALCITE-366
       Hive: https://issues.apache.org/jira/browse/HIVE-10785
       Trino: 
https://github.com/trinodb/trino/blob/375/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/PushPartialAggregationThroughJoin.java
       Presto: 
https://github.com/prestodb/presto/blob/0.271/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughJoin.java
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Unit test and TPC-DS benchmark test.
   
   SQL | Before this PR(Seconds) | After this PR(Seconds)
   -- | -- | --
   q37 | 31 | 14
   q38 | 60 | 28
   q54 | 10 | 12
   q82 | 52 | 25
   q87 | 46 | 41
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to