wangyum opened a new pull request #35194:
URL: https://github.com/apache/spark/pull/35194


   ### What changes were proposed in this pull request?
   
   Add a new rule(`EliminateInnerJoin`) to support convert inner join to left 
semi join. It has two advantages:
   1. Statistics are more accurate and more `BroadcastHashJoin`s can be planned.
   2. We have 2 other rules(`PushDownLeftSemiAntiJoin` and 
`PushLeftSemiLeftAntiThroughJoin`) to optimize left semi join.
   
   This is a real case:
   ```scala
   sql("CREATE TABLE t1 using parquet AS SELECT id AS a, id AS b, id AS c FROM 
range(10000000)")
   sql("CREATE TABLE t2 using parquet AS SELECT id AS a, id AS b, id AS c FROM 
range(10000000)")
   sql("CREATE TABLE t3 using parquet AS SELECT id AS a, id AS b, id AS c FROM 
range(1000)")
   
   sql(
     """
       |SELECT tmp1.*
       |FROM   (SELECT *
       |        FROM   t1
       |        UNION
       |        SELECT *
       |        FROM   t2) tmp1
       |       INNER JOIN (SELECT DISTINCT a,
       |                                   b
       |                   FROM   t3) tmp2
       |               ON tmp1.a = tmp2.a
       |                  AND tmp1.b = tmp2.b 
     """.stripMargin).explain
   ```
   
   Before this pr:
   ```
   == Optimized Logical Plan ==
   Project [a#12L, b#13L, c#14L]
   +- Join Inner, ((a#12L = a#18L) AND (b#13L = b#19L))
      :- Aggregate [a#12L, b#13L, c#14L], [a#12L, b#13L, c#14L]
      :  +- Union false, false
      :     :- Filter (isnotnull(a#12L) AND isnotnull(b#13L))
      :     :  +- Relation default.t1[a#12L,b#13L,c#14L] parquet
      :     +- Filter (isnotnull(a#15L) AND isnotnull(b#16L))
      :        +- Relation default.t2[a#15L,b#16L,c#17L] parquet
      +- Aggregate [a#18L, b#19L], [a#18L, b#19L]
         +- Project [a#18L, b#19L]
            +- Filter (isnotnull(a#18L) AND isnotnull(b#19L))
               +- Relation default.t3[a#18L,b#19L,c#20L] parquet
   ```
   
   After this pr:
   ```
   Aggregate [a#12L, b#13L, c#14L], [a#12L, b#13L, c#14L]
   +- Union false, false
      :- Join LeftSemi, ((a#12L = a#18L) AND (b#13L = b#19L))
      :  :- Filter (isnotnull(a#12L) AND isnotnull(b#13L))
      :  :  +- Relation default.t1[a#12L,b#13L,c#14L] parquet
      :  +- Aggregate [a#18L, b#19L], [a#18L, b#19L]
      :     +- Project [a#18L, b#19L]
      :        +- Filter (isnotnull(a#18L) AND isnotnull(b#19L))
      :           +- Relation default.t3[a#18L,b#19L,c#20L] parquet
      +- Join LeftSemi, ((a#15L = a#18L) AND (b#16L = b#19L))
         :- Filter (isnotnull(a#15L) AND isnotnull(b#16L))
         :  +- Relation default.t2[a#15L,b#16L,c#17L] parquet
         +- Aggregate [a#18L, b#19L], [a#18L, b#19L]
            +- Project [a#18L, b#19L]
               +- Filter (isnotnull(a#18L) AND isnotnull(b#19L))
                  +- Relation default.t3[a#18L,b#19L,c#20L] parquet
   ```
   
   ### Why are the changes needed?
   
   Improve query performance.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Unit test and benchmark test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to