Github user tejasapatil commented on the issue: https://github.com/apache/spark/pull/15605 Most of the tests are failing because `ShuffleExchange` currently does not handle `PartitioningCollection`. That can be fixed but as I looked at this more, I found a flaw in the current approach. The objective of this PR is to make `SortMergeJoin` (SMJ) avoid shuffle in below scenarios: 1. If every children's output is hash partitioned on all join predicates. This is already happening. 2. If every children's output partitioning is hash partitioned on a given single join column. Fixing that might need some change which I might changing some core behavior. I want to get feedback or better alternatives before jumping and putting out a change for review. Lets look at `createPartitioning()`. `ClusteredDistribution` is mapped to `HashPartitioning` : https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L53 `HashPartitioning(i, j, k)` does satisfy `ClusteredDistribution(i, j, k)`. But so do each one of `HashPartitioning(i)`, `HashPartitioning(j)` and `HashPartitioning(k)`. So, one would want to have all the children of SMJ operator to satisfy any one of [`HashPartitioning(i, j, k)`, `HashPartitioning(i)`, `HashPartitioning(j)`, `HashPartitioning(k)`]. `PartitioningCollection` provides a way to pack these several partitioning in a single place. But it does not track which exact partitioning is satisfied by a given child. eg. `table_A` can be partitioned on hash(i) and `table_B` can be partitioned on hash(j) OR hash(i,j) where i and j and join predicates. The current version of PR would skip shuffle over both children in such case which is wrong. The fix would be to track which one of the partitioning in the `PartitioningCollection` is satisfied by each children and make sure its the same across all its children. Based on that, also change the `outputPartitioning` of the SMJ node during plan generation. So far from what I have seen, manipulating the partitioning / distribution for `SparkPlan` is not supported. Its probably for the good as it will mutate the properties of the plan tree (constants are good). If plan optimizations are done bottom-up, then this might not cause problems. would like to hear opinions about this approach. If there are better alternatives, feel free to share.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org