wangyum opened a new pull request, #36552:
URL: https://github.com/apache/spark/pull/36552

   ### What changes were proposed in this pull request?
   
   1. Add a new optimizer rule(PushPartialAggregationThroughJoin) to push the 
partial aggregation through join.  It supports the following cases:
      - Push down partial sum, count, avg, min, max, first and last through 
inner join.
      - Partial deduplicate the children of join if the aggregation itself is 
group only.
      - Partial deduplicate the right side of left semi/anti join.
       
      For example:
      ```sql
      CREATE TABLE t1(a int, b int, c int) using parquet;
      CREATE TABLE t2(x int, y int, z int) using parquet;
      
      EXPLAIN EXTENDED SELECT b, SUM(c) FROM t1 INNER JOIN t2 ON t1.a = t2.x 
GROUP BY b;
      == Optimized Logical Plan ==
      Aggregate [b#1], [b#1, sum((_pushed_sum_c#12L * cnt#15L)) AS sum(c)#7L]
      +- Project [_pushed_sum_c#12L, b#1, cnt#15L]
         +- Join Inner, (a#0 = x#3)
            :- PartialAggregate [a#0, b#1], [sum(c#2) AS _pushed_sum_c#12L, 
a#0, b#1]
            :  +- Project [b#1, c#2, a#0]
            :     +- Filter isnotnull(a#0)
            :        +- Relation default.t1[a#0,b#1,c#2] parquet
            +- PartialAggregate [x#3], [count(1) AS cnt#15L, x#3]
               +- Project [x#3]
                  +- Filter isnotnull(x#3)
                     +- Relation default.t2[x#3,y#4,z#5] parquet
      
      EXPLAIN EXTENDED SELECT DISTINCT b, y FROM t1 INNER JOIN t2 ON t1.a = 
t2.x;
      == Optimized Logical Plan ==
      Aggregate [b#1, y#4], [b#1, y#4]
      +- Project [b#1, y#4]
         +- Join Inner, (a#0 = x#3)
            :- PartialAggregate [a#0, b#1], [a#0, b#1]
            :  +- Project [a#0, b#1]
            :     +- Filter isnotnull(a#0)
            :        +- Relation default.t1[a#0,b#1,c#2] parquet
            +- Project [x#3, y#4]
               +- Filter isnotnull(x#3)
                  +- Relation default.t2[x#3,y#4,z#5] parquet
      
      SET spark.sql.autoBroadcastJoinThreshold=-1;
      EXPLAIN EXTENDED SELECT * FROM t1 WHERE t1.a IN (SELECT x FROM t2);
      == Optimized Logical Plan ==
      Join LeftSemi, (a#11 = x#14)
      :- Relation default.t1[a#11,b#12,c#13] parquet
      +- PartialAggregate [x#14], [x#14]
         +- Project [x#14]
            +- Relation default.t2[x#14,y#15,z#16] parquet
      ```
   
   2. Make partial aggregation adaptive to skip do partial aggregation if this 
step does not reduce the number of rows too much.
   
   
   ### Why are the changes needed?
   
   1. Improve query performance
   4. Many databases have similar rules. For example: 
[Teradata](https://docs.teradata.com/r/Teradata-VantageTM-SQL-Request-and-Transaction-Processing/March-2019/Join-Planning-and-Optimization/Partial-GROUP-BY-Block-Optimization),
 [Calcite](https://issues.apache.org/jira/browse/CALCITE-366), 
[Hive](https://issues.apache.org/jira/browse/HIVE-10785), 
[Trino](https://github.com/trinodb/trino/blob/375/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/PushPartialAggregationThroughJoin.java)
 and 
[Presto](https://github.com/prestodb/presto/blob/0.271/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughJoin.java).
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Unit test and benchmark test.
   
   <img width="982" alt="image" 
src="https://user-images.githubusercontent.com/5399861/168425573-ae8336ce-62c3-45fa-84a1-4b61019b1186.png";>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to