Is the following what you trying to do? spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0") val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("x", "y") val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("x", "y") df1.write.format("parquet").bucketBy(8, "x", "y").saveAsTable("t1") df2.write.format("parquet").bucketBy(8, "x", "y").saveAsTable("t2") val t1 = spark.table("t1") val t2 = spark.table("t2") val joined = t1.join(t2, Seq("x", "y")) joined.explain
I see no exchange: == Physical Plan == *(3) Project [x#342, y#343] +- *(3) SortMergeJoin [x#342, y#343], [x#346, y#347], Inner :- *(1) Sort [x#342 ASC NULLS FIRST, y#343 ASC NULLS FIRST], false, 0 : +- *(1) Project [x#342, y#343] : +- *(1) Filter (isnotnull(x#342) && isnotnull(y#343)) : +- *(1) FileScan parquet default.t1[x#342,y#343] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/], PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema: struct<x:int,y:int>, SelectedBucketsCount: 8 out of 8 +- *(2) Sort [x#346 ASC NULLS FIRST, y#347 ASC NULLS FIRST], false, 0 +- *(2) Project [x#346, y#347] +- *(2) Filter (isnotnull(x#346) && isnotnull(y#347)) +- *(2) FileScan parquet default.t2[x#346,y#347] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/], PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema: struct<x:int,y:int>, SelectedBucketsCount: 8 out of 8 On Sun, May 31, 2020 at 2:38 PM Patrick Woody <patrick.woo...@gmail.com> wrote: > Hey Terry, > > Thanks for the response! I'm not sure that it ends up working though - the > bucketing still seems to require the exchange before the join. Both tables > below are saved bucketed by "x": > *(5) Project [x#29, y#30, z#31, z#37] > +- *(5) SortMergeJoin [x#29, y#30], [x#35, y#36], Inner > :- *(2) Sort [x#29 ASC NULLS FIRST, y#30 ASC NULLS FIRST], false, 0 > * : +- Exchange hashpartitioning(x#29, y#30, 200)* > : +- *(1) Project [x#29, y#30, z#31] > : +- *(1) Filter (isnotnull(x#29) && isnotnull(y#30)) > : +- *(1) FileScan parquet default.ax[x#29,y#30,z#31] > Batched: true, Format: Parquet, Location: > InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/ax], > PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)], > ReadSchema: struct<x:int,y:int,z:int>, SelectedBucketsCount: 200 out of 200 > +- *(4) Sort [x#35 ASC NULLS FIRST, y#36 ASC NULLS FIRST], false, 0 > * +- Exchange hashpartitioning(x#35, y#36, 200)* > +- *(3) Project [x#35, y#36, z#37] > +- *(3) Filter (isnotnull(x#35) && isnotnull(y#36)) > +- *(3) FileScan parquet default.bx[x#35,y#36,z#37] > Batched: true, Format: Parquet, Location: > InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/bx], > PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)], > ReadSchema: struct<x:int,y:int,z:int>, SelectedBucketsCount: 200 out of 200 > > Best, > Pat > > > > On Sun, May 31, 2020 at 3:15 PM Terry Kim <yumin...@gmail.com> wrote: > >> You can use bucketBy to avoid shuffling in your scenario. This test suite >> has some examples: >> https://github.com/apache/spark/blob/45cf5e99503b00a6bd83ea94d6d92761db1a00ab/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L343 >> >> Thanks, >> Terry >> >> On Sun, May 31, 2020 at 7:43 AM Patrick Woody <patrick.woo...@gmail.com> >> wrote: >> >>> Hey all, >>> >>> I have one large table, A, and two medium sized tables, B & C, that I'm >>> trying to complete a join on efficiently. The result is multiplicative on A >>> join B, so I'd like to avoid shuffling that result. For this example, let's >>> just assume each table has three columns, x, y, z. The below is all being >>> tested on Spark 2.4.5 locally. >>> >>> I'd like to perform the following join: >>> A.join(B, Seq("x", "y")).join(C, Seq("x", "z")) >>> This outputs the following physical plan: >>> == Physical Plan == >>> *(6) Project [x#32, z#34, y#33, z#74, y#53] >>> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner >>> :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0 >>> : +- Exchange hashpartitioning(x#32, z#34, 200) >>> : +- *(3) Project [x#32, y#33, z#34, z#74] >>> : +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner >>> : :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS >>> FIRST], false, 0 >>> : : +- Exchange hashpartitioning(x#32, y#33, 200) >>> : : +- LocalTableScan [x#32, y#33, z#34] >>> : +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS >>> FIRST], false, 0 >>> : +- Exchange hashpartitioning(x#72, y#73, 200) >>> : +- LocalTableScan [x#72, y#73, z#74] >>> +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0 >>> +- Exchange hashpartitioning(x#52, z#54, 200) >>> +- LocalTableScan [x#52, y#53, z#54] >>> >>> >>> I may be misremembering, but in the past I thought you had the ability >>> to pre-partition each table by "x" and it would satisfy the requirements of >>> the join since it is already clustered by the key on both sides using the >>> same hash function (this assumes numPartitions lines up obviously). However >>> it seems like it will insert another exchange: >>> >>> A.repartition($"x").join(B.repartition($"x"), Seq("x", >>> "y")).join(C.repartition($"x"), Seq("x", "z")) >>> *(6) Project [x#32, z#34, y#33, z#74, y#53] >>> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner >>> :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0 >>> : +- Exchange hashpartitioning(x#32, z#34, 200) >>> : +- *(3) Project [x#32, y#33, z#34, z#74] >>> : +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner >>> : :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS >>> FIRST], false, 0 >>> : : +- Exchange hashpartitioning(x#32, y#33, 200) >>> : : +- Exchange hashpartitioning(x#32, 200) >>> : : +- LocalTableScan [x#32, y#33, z#34] >>> : +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS >>> FIRST], false, 0 >>> : +- Exchange hashpartitioning(x#72, y#73, 200) >>> : +- Exchange hashpartitioning(x#72, 200) >>> : +- LocalTableScan [x#72, y#73, z#74] >>> +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0 >>> +- Exchange hashpartitioning(x#52, z#54, 200) >>> +- ReusedExchange [x#52, y#53, z#54], Exchange >>> hashpartitioning(x#32, 200). >>> >>> Note, that using this "strategy" with groupBy("x", "y") works fine >>> though I assume that is because it doesn't have to consider the other side >>> of the join. >>> >>> Did this used to work or am I simply confusing it with groupBy? Either >>> way - any thoughts on how I can avoid shuffling the bulk of the join result? >>> >>> Thanks, >>> Pat >>> >>> >>> >>> >>>