bmarcott opened a new pull request #27096: SPARK-28148: repartition after join 
is not optimized away
URL: https://github.com/apache/spark/pull/27096
 
 
   ### What changes were proposed in this pull request?
   
   Extra shuffling was not eliminated after inner joins because they produce 
PartitioningCollection Partitioning and the current logic only matched on 
HashPartitioning. 
   
   Nothing was present in EnsureRequirements to eliminate parent sorting 
(within partitions) which was unnecessary when the same sort order was 
introduced by sortmergejoin
   
   Copied from jira:
   Partitioning & sorting is usually retained after join.
   ```
   spark.conf.set('spark.sql.shuffle.partitions', '42')
   
   df1 = spark.range(5000000, numPartitions=5)
   df2 = spark.range(10000000, numPartitions=5)
   df3 = spark.range(20000000, numPartitions=5)
   
   # Reuse previous partitions & sort.
   df1.join(df2, on='id').join(df3, on='id').explain()
   # == Physical Plan ==
   # *(8) Project [id#367L]
   # +- *(8) SortMergeJoin [id#367L], [id#374L], Inner
   #    :- *(5) Project [id#367L]
   #    :  +- *(5) SortMergeJoin [id#367L], [id#369L], Inner
   #    :     :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
   #    :     :  +- Exchange hashpartitioning(id#367L, 42)
   #    :     :     +- *(1) Range (0, 5000000, step=1, splits=5)
   #    :     +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
   #    :        +- Exchange hashpartitioning(id#369L, 42)
   #    :           +- *(3) Range (0, 10000000, step=1, splits=5)
   #    +- *(7) Sort [id#374L ASC NULLS FIRST], false, 0
   #       +- Exchange hashpartitioning(id#374L, 42)
   #          +- *(6) Range (0, 20000000, step=1, splits=5)
   ```
   
   However here: Partitions persist through left join, sort doesn't.
   
   ```
   df1.join(df2, on='id', 
how='left').repartition('id').sortWithinPartitions('id').explain()
   # == Physical Plan ==
   # *(5) Sort [id#367L ASC NULLS FIRST], false, 0
   # +- *(5) Project [id#367L]
   #    +- SortMergeJoin [id#367L], [id#369L], LeftOuter
   #       :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
   #       :  +- Exchange hashpartitioning(id#367L, 42)
   #       :     +- *(1) Range (0, 5000000, step=1, splits=5)
   #       +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
   #          +- Exchange hashpartitioning(id#369L, 42)
   #             +- *(3) Range (0, 10000000, step=1, splits=5)
   ```
   Also here: Partitions do not persist though inner join.
   
   ```
   df1.join(df2, on='id').repartition('id').sortWithinPartitions('id').explain()
   # == Physical Plan ==
   # *(6) Sort [id#367L ASC NULLS FIRST], false, 0
   # +- Exchange hashpartitioning(id#367L, 42)
   #    +- *(5) Project [id#367L]
   #       +- *(5) SortMergeJoin [id#367L], [id#369L], Inner
   #          :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
   #          :  +- Exchange hashpartitioning(id#367L, 42)
   #          :     +- *(1) Range (0, 5000000, step=1, splits=5)
   #          +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
   #             +- Exchange hashpartitioning(id#369L, 42)
   #                +- *(3) Range (0, 10000000, step=1, splits=5)
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to