You can use bucketBy to avoid shuffling in your scenario. This test suite has some examples: https://github.com/apache/spark/blob/45cf5e99503b00a6bd83ea94d6d92761db1a00ab/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L343
Thanks, Terry On Sun, May 31, 2020 at 7:43 AM Patrick Woody <patrick.woo...@gmail.com> wrote: > Hey all, > > I have one large table, A, and two medium sized tables, B & C, that I'm > trying to complete a join on efficiently. The result is multiplicative on A > join B, so I'd like to avoid shuffling that result. For this example, let's > just assume each table has three columns, x, y, z. The below is all being > tested on Spark 2.4.5 locally. > > I'd like to perform the following join: > A.join(B, Seq("x", "y")).join(C, Seq("x", "z")) > This outputs the following physical plan: > == Physical Plan == > *(6) Project [x#32, z#34, y#33, z#74, y#53] > +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner > :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(x#32, z#34, 200) > : +- *(3) Project [x#32, y#33, z#34, z#74] > : +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner > : :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], > false, 0 > : : +- Exchange hashpartitioning(x#32, y#33, 200) > : : +- LocalTableScan [x#32, y#33, z#34] > : +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], > false, 0 > : +- Exchange hashpartitioning(x#72, y#73, 200) > : +- LocalTableScan [x#72, y#73, z#74] > +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(x#52, z#54, 200) > +- LocalTableScan [x#52, y#53, z#54] > > > I may be misremembering, but in the past I thought you had the ability to > pre-partition each table by "x" and it would satisfy the requirements of > the join since it is already clustered by the key on both sides using the > same hash function (this assumes numPartitions lines up obviously). However > it seems like it will insert another exchange: > > A.repartition($"x").join(B.repartition($"x"), Seq("x", > "y")).join(C.repartition($"x"), Seq("x", "z")) > *(6) Project [x#32, z#34, y#33, z#74, y#53] > +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner > :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(x#32, z#34, 200) > : +- *(3) Project [x#32, y#33, z#34, z#74] > : +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner > : :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], > false, 0 > : : +- Exchange hashpartitioning(x#32, y#33, 200) > : : +- Exchange hashpartitioning(x#32, 200) > : : +- LocalTableScan [x#32, y#33, z#34] > : +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], > false, 0 > : +- Exchange hashpartitioning(x#72, y#73, 200) > : +- Exchange hashpartitioning(x#72, 200) > : +- LocalTableScan [x#72, y#73, z#74] > +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(x#52, z#54, 200) > +- ReusedExchange [x#52, y#53, z#54], Exchange > hashpartitioning(x#32, 200). > > Note, that using this "strategy" with groupBy("x", "y") works fine though > I assume that is because it doesn't have to consider the other side of the > join. > > Did this used to work or am I simply confusing it with groupBy? Either way > - any thoughts on how I can avoid shuffling the bulk of the join result? > > Thanks, > Pat > > > > >