bmarcott opened a new pull request #27096: SPARK-28148: repartition after join is not optimized away URL: https://github.com/apache/spark/pull/27096 ### What changes were proposed in this pull request? Extra shuffling was not eliminated after inner joins because they produce PartitioningCollection Partitioning and the current logic only matched on HashPartitioning. Nothing was present in EnsureRequirements to eliminate parent sorting (within partitions) which was unnecessary when the same sort order was introduced by sortmergejoin Copied from jira: Partitioning & sorting is usually retained after join. ``` spark.conf.set('spark.sql.shuffle.partitions', '42') df1 = spark.range(5000000, numPartitions=5) df2 = spark.range(10000000, numPartitions=5) df3 = spark.range(20000000, numPartitions=5) # Reuse previous partitions & sort. df1.join(df2, on='id').join(df3, on='id').explain() # == Physical Plan == # *(8) Project [id#367L] # +- *(8) SortMergeJoin [id#367L], [id#374L], Inner # :- *(5) Project [id#367L] # : +- *(5) SortMergeJoin [id#367L], [id#369L], Inner # : :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0 # : : +- Exchange hashpartitioning(id#367L, 42) # : : +- *(1) Range (0, 5000000, step=1, splits=5) # : +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0 # : +- Exchange hashpartitioning(id#369L, 42) # : +- *(3) Range (0, 10000000, step=1, splits=5) # +- *(7) Sort [id#374L ASC NULLS FIRST], false, 0 # +- Exchange hashpartitioning(id#374L, 42) # +- *(6) Range (0, 20000000, step=1, splits=5) ``` However here: Partitions persist through left join, sort doesn't. ``` df1.join(df2, on='id', how='left').repartition('id').sortWithinPartitions('id').explain() # == Physical Plan == # *(5) Sort [id#367L ASC NULLS FIRST], false, 0 # +- *(5) Project [id#367L] # +- SortMergeJoin [id#367L], [id#369L], LeftOuter # :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0 # : +- Exchange hashpartitioning(id#367L, 42) # : +- *(1) Range (0, 5000000, step=1, splits=5) # +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0 # +- Exchange hashpartitioning(id#369L, 42) # +- *(3) Range (0, 10000000, step=1, splits=5) ``` Also here: Partitions do not persist though inner join. ``` df1.join(df2, on='id').repartition('id').sortWithinPartitions('id').explain() # == Physical Plan == # *(6) Sort [id#367L ASC NULLS FIRST], false, 0 # +- Exchange hashpartitioning(id#367L, 42) # +- *(5) Project [id#367L] # +- *(5) SortMergeJoin [id#367L], [id#369L], Inner # :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0 # : +- Exchange hashpartitioning(id#367L, 42) # : +- *(1) Range (0, 5000000, step=1, splits=5) # +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0 # +- Exchange hashpartitioning(id#369L, 42) # +- *(3) Range (0, 10000000, step=1, splits=5) ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
