Github user tejasapatil commented on the issue:
https://github.com/apache/spark/pull/15605
Most of the tests are failing because `ShuffleExchange` currently does not
handle `PartitioningCollection`. That can be fixed but as I looked at this
more, I found a flaw in the current approach.
The objective of this PR is to make `SortMergeJoin` (SMJ) avoid shuffle in
below scenarios:
1. If every children's output is hash partitioned on all join predicates.
This is already happening.
2. If every children's output partitioning is hash partitioned on a given
single join column.
Fixing that might need some change which I might changing some core
behavior. I want to get feedback or better alternatives before jumping and
putting out a change for review.
Lets look at `createPartitioning()`. `ClusteredDistribution` is mapped to
`HashPartitioning` :
https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L53
`HashPartitioning(i, j, k)` does satisfy `ClusteredDistribution(i, j, k)`.
But so do each one of `HashPartitioning(i)`, `HashPartitioning(j)` and
`HashPartitioning(k)`. So, one would want to have all the children of SMJ
operator to satisfy any one of [`HashPartitioning(i, j, k)`,
`HashPartitioning(i)`, `HashPartitioning(j)`, `HashPartitioning(k)`].
`PartitioningCollection` provides a way to pack these several partitioning
in a single place. But it does not track which exact partitioning is satisfied
by a given child. eg. `table_A` can be partitioned on hash(i) and `table_B` can
be partitioned on hash(j) OR hash(i,j) where i and j and join predicates. The
current version of PR would skip shuffle over both children in such case which
is wrong.
The fix would be to track which one of the partitioning in the
`PartitioningCollection` is satisfied by each children and make sure its the
same across all its children. Based on that, also change the
`outputPartitioning` of the SMJ node during plan generation. So far from what I
have seen, manipulating the partitioning / distribution for `SparkPlan` is not
supported. Its probably for the good as it will mutate the properties of the
plan tree (constants are good). If plan optimizations are done bottom-up, then
this might not cause problems.
would like to hear opinions about this approach. If there are better
alternatives, feel free to share.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]