Hey Terry, Thanks for the response! I'm not sure that it ends up working though - the bucketing still seems to require the exchange before the join. Both tables below are saved bucketed by "x": *(5) Project [x#29, y#30, z#31, z#37] +- *(5) SortMergeJoin [x#29, y#30], [x#35, y#36], Inner :- *(2) Sort [x#29 ASC NULLS FIRST, y#30 ASC NULLS FIRST], false, 0 * : +- Exchange hashpartitioning(x#29, y#30, 200)* : +- *(1) Project [x#29, y#30, z#31] : +- *(1) Filter (isnotnull(x#29) && isnotnull(y#30)) : +- *(1) FileScan parquet default.ax[x#29,y#30,z#31] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/ax], PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema: struct<x:int,y:int,z:int>, SelectedBucketsCount: 200 out of 200 +- *(4) Sort [x#35 ASC NULLS FIRST, y#36 ASC NULLS FIRST], false, 0 * +- Exchange hashpartitioning(x#35, y#36, 200)* +- *(3) Project [x#35, y#36, z#37] +- *(3) Filter (isnotnull(x#35) && isnotnull(y#36)) +- *(3) FileScan parquet default.bx[x#35,y#36,z#37] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/bx], PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema: struct<x:int,y:int,z:int>, SelectedBucketsCount: 200 out of 200
Best, Pat On Sun, May 31, 2020 at 3:15 PM Terry Kim <yumin...@gmail.com> wrote: > You can use bucketBy to avoid shuffling in your scenario. This test suite > has some examples: > https://github.com/apache/spark/blob/45cf5e99503b00a6bd83ea94d6d92761db1a00ab/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L343 > > Thanks, > Terry > > On Sun, May 31, 2020 at 7:43 AM Patrick Woody <patrick.woo...@gmail.com> > wrote: > >> Hey all, >> >> I have one large table, A, and two medium sized tables, B & C, that I'm >> trying to complete a join on efficiently. The result is multiplicative on A >> join B, so I'd like to avoid shuffling that result. For this example, let's >> just assume each table has three columns, x, y, z. The below is all being >> tested on Spark 2.4.5 locally. >> >> I'd like to perform the following join: >> A.join(B, Seq("x", "y")).join(C, Seq("x", "z")) >> This outputs the following physical plan: >> == Physical Plan == >> *(6) Project [x#32, z#34, y#33, z#74, y#53] >> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner >> :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0 >> : +- Exchange hashpartitioning(x#32, z#34, 200) >> : +- *(3) Project [x#32, y#33, z#34, z#74] >> : +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner >> : :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], >> false, 0 >> : : +- Exchange hashpartitioning(x#32, y#33, 200) >> : : +- LocalTableScan [x#32, y#33, z#34] >> : +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], >> false, 0 >> : +- Exchange hashpartitioning(x#72, y#73, 200) >> : +- LocalTableScan [x#72, y#73, z#74] >> +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0 >> +- Exchange hashpartitioning(x#52, z#54, 200) >> +- LocalTableScan [x#52, y#53, z#54] >> >> >> I may be misremembering, but in the past I thought you had the ability to >> pre-partition each table by "x" and it would satisfy the requirements of >> the join since it is already clustered by the key on both sides using the >> same hash function (this assumes numPartitions lines up obviously). However >> it seems like it will insert another exchange: >> >> A.repartition($"x").join(B.repartition($"x"), Seq("x", >> "y")).join(C.repartition($"x"), Seq("x", "z")) >> *(6) Project [x#32, z#34, y#33, z#74, y#53] >> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner >> :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0 >> : +- Exchange hashpartitioning(x#32, z#34, 200) >> : +- *(3) Project [x#32, y#33, z#34, z#74] >> : +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner >> : :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], >> false, 0 >> : : +- Exchange hashpartitioning(x#32, y#33, 200) >> : : +- Exchange hashpartitioning(x#32, 200) >> : : +- LocalTableScan [x#32, y#33, z#34] >> : +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], >> false, 0 >> : +- Exchange hashpartitioning(x#72, y#73, 200) >> : +- Exchange hashpartitioning(x#72, 200) >> : +- LocalTableScan [x#72, y#73, z#74] >> +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0 >> +- Exchange hashpartitioning(x#52, z#54, 200) >> +- ReusedExchange [x#52, y#53, z#54], Exchange >> hashpartitioning(x#32, 200). >> >> Note, that using this "strategy" with groupBy("x", "y") works fine though >> I assume that is because it doesn't have to consider the other side of the >> join. >> >> Did this used to work or am I simply confusing it with groupBy? Either >> way - any thoughts on how I can avoid shuffling the bulk of the join result? >> >> Thanks, >> Pat >> >> >> >> >>