Github user tejasapatil commented on the issue:

    https://github.com/apache/spark/pull/15605
  
    Most of the tests are failing because `ShuffleExchange` currently does not 
handle `PartitioningCollection`. That can be fixed but as I looked at this 
more, I found a flaw in the current approach.
    
    The objective of this PR is to make `SortMergeJoin` (SMJ) avoid shuffle in 
below scenarios:
    
    1. If every children's output is hash partitioned on all join predicates. 
This is already happening.
    2. If every children's output partitioning is hash partitioned on a given 
single join column.
    
    Fixing that might need some change which I might changing some core 
behavior. I want to get feedback or better alternatives before jumping and 
putting out a change for review.
    
    Lets look at `createPartitioning()`. `ClusteredDistribution` is mapped to 
`HashPartitioning` : 
https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L53
    
    `HashPartitioning(i, j, k)` does satisfy `ClusteredDistribution(i, j, k)`. 
But so do each one of `HashPartitioning(i)`, `HashPartitioning(j)` and 
`HashPartitioning(k)`. So, one would want to have all the children of SMJ 
operator to satisfy any one of [`HashPartitioning(i, j, k)`, 
`HashPartitioning(i)`, `HashPartitioning(j)`, `HashPartitioning(k)`].
    
    `PartitioningCollection` provides a way to pack these several partitioning 
in a single place. But it does not track which exact partitioning is satisfied 
by a given child. eg. `table_A` can be partitioned on hash(i) and `table_B` can 
be partitioned on hash(j) OR hash(i,j) where i and j and join predicates. The 
current version of PR would skip shuffle over both children in such case which 
is wrong.
    
    The fix would be to track which one of the partitioning in the 
`PartitioningCollection` is satisfied by each children and make sure its the 
same across all its children. Based on that, also change the 
`outputPartitioning` of the SMJ node during plan generation. So far from what I 
have seen, manipulating the partitioning / distribution for `SparkPlan` is not 
supported. Its probably for the good as it will mutate the properties of the 
plan tree (constants are good). If plan optimizations are done bottom-up, then 
this might not cause problems.
    
    would like to hear opinions about this approach. If there are better 
alternatives, feel free to share.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to