You can use bucketBy to avoid shuffling in your scenario. This test suite
has some examples:
https://github.com/apache/spark/blob/45cf5e99503b00a6bd83ea94d6d92761db1a00ab/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L343

Thanks,
Terry

On Sun, May 31, 2020 at 7:43 AM Patrick Woody <patrick.woo...@gmail.com>
wrote:

> Hey all,
>
> I have one large table, A, and two medium sized tables, B & C, that I'm
> trying to complete a join on efficiently. The result is multiplicative on A
> join B, so I'd like to avoid shuffling that result. For this example, let's
> just assume each table has three columns, x, y, z. The below is all being
> tested on Spark 2.4.5 locally.
>
> I'd like to perform the following join:
> A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
> This outputs the following physical plan:
> == Physical Plan ==
> *(6) Project [x#32, z#34, y#33, z#74, y#53]
> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>    :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(x#32, z#34, 200)
>    :     +- *(3) Project [x#32, y#33, z#34, z#74]
>    :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>    :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
> false, 0
>    :           :  +- Exchange hashpartitioning(x#32, y#33, 200)
>    :           :     +- LocalTableScan [x#32, y#33, z#34]
>    :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
> false, 0
>    :              +- Exchange hashpartitioning(x#72, y#73, 200)
>    :                 +- LocalTableScan [x#72, y#73, z#74]
>    +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(x#52, z#54, 200)
>          +- LocalTableScan [x#52, y#53, z#54]
>
>
> I may be misremembering, but in the past I thought you had the ability to
> pre-partition each table by "x" and it would satisfy the requirements of
> the join since it is already clustered by the key on both sides using the
> same hash function (this assumes numPartitions lines up obviously). However
> it seems like it will insert another exchange:
>
> A.repartition($"x").join(B.repartition($"x"), Seq("x",
> "y")).join(C.repartition($"x"), Seq("x", "z"))
> *(6) Project [x#32, z#34, y#33, z#74, y#53]
> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>    :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(x#32, z#34, 200)
>    :     +- *(3) Project [x#32, y#33, z#34, z#74]
>    :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>    :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
> false, 0
>    :           :  +- Exchange hashpartitioning(x#32, y#33, 200)
>    :           :     +- Exchange hashpartitioning(x#32, 200)
>    :           :        +- LocalTableScan [x#32, y#33, z#34]
>    :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
> false, 0
>    :              +- Exchange hashpartitioning(x#72, y#73, 200)
>    :                 +- Exchange hashpartitioning(x#72, 200)
>    :                    +- LocalTableScan [x#72, y#73, z#74]
>    +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(x#52, z#54, 200)
>          +- ReusedExchange [x#52, y#53, z#54], Exchange
> hashpartitioning(x#32, 200).
>
> Note, that using this "strategy" with groupBy("x", "y") works fine though
> I assume that is because it doesn't have to consider the other side of the
> join.
>
> Did this used to work or am I simply confusing it with groupBy? Either way
> - any thoughts on how I can avoid shuffling the bulk of the join result?
>
> Thanks,
> Pat
>
>
>
>
>

Reply via email to