Is the following what you trying to do?

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("x", "y")
val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("x", "y")
df1.write.format("parquet").bucketBy(8, "x", "y").saveAsTable("t1")
df2.write.format("parquet").bucketBy(8, "x", "y").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val joined = t1.join(t2, Seq("x", "y"))
joined.explain

I see no exchange:

== Physical Plan ==
*(3) Project [x#342, y#343]
+- *(3) SortMergeJoin [x#342, y#343], [x#346, y#347], Inner
   :- *(1) Sort [x#342 ASC NULLS FIRST, y#343 ASC NULLS FIRST], false, 0
   :  +- *(1) Project [x#342, y#343]
   :     +- *(1) Filter (isnotnull(x#342) && isnotnull(y#343))
   :        +- *(1) FileScan parquet default.t1[x#342,y#343] Batched: true,
Format: Parquet, Location: InMemoryFileIndex[file:/], PartitionFilters: [],
PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema:
struct<x:int,y:int>, SelectedBucketsCount: 8 out of 8
   +- *(2) Sort [x#346 ASC NULLS FIRST, y#347 ASC NULLS FIRST], false, 0
      +- *(2) Project [x#346, y#347]
         +- *(2) Filter (isnotnull(x#346) && isnotnull(y#347))
            +- *(2) FileScan parquet default.t2[x#346,y#347] Batched: true,
Format: Parquet, Location: InMemoryFileIndex[file:/], PartitionFilters: [],
PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema:
struct<x:int,y:int>, SelectedBucketsCount: 8 out of 8

On Sun, May 31, 2020 at 2:38 PM Patrick Woody <patrick.woo...@gmail.com>
wrote:

> Hey Terry,
>
> Thanks for the response! I'm not sure that it ends up working though - the
> bucketing still seems to require the exchange before the join. Both tables
> below are saved bucketed by "x":
> *(5) Project [x#29, y#30, z#31, z#37]
> +- *(5) SortMergeJoin [x#29, y#30], [x#35, y#36], Inner
>    :- *(2) Sort [x#29 ASC NULLS FIRST, y#30 ASC NULLS FIRST], false, 0
> *   :  +- Exchange hashpartitioning(x#29, y#30, 200)*
>    :     +- *(1) Project [x#29, y#30, z#31]
>    :        +- *(1) Filter (isnotnull(x#29) && isnotnull(y#30))
>    :           +- *(1) FileScan parquet default.ax[x#29,y#30,z#31]
> Batched: true, Format: Parquet, Location:
> InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/ax],
> PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)],
> ReadSchema: struct<x:int,y:int,z:int>, SelectedBucketsCount: 200 out of 200
>    +- *(4) Sort [x#35 ASC NULLS FIRST, y#36 ASC NULLS FIRST], false, 0
> *      +- Exchange hashpartitioning(x#35, y#36, 200)*
>          +- *(3) Project [x#35, y#36, z#37]
>             +- *(3) Filter (isnotnull(x#35) && isnotnull(y#36))
>                +- *(3) FileScan parquet default.bx[x#35,y#36,z#37]
> Batched: true, Format: Parquet, Location:
> InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/bx],
> PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)],
> ReadSchema: struct<x:int,y:int,z:int>, SelectedBucketsCount: 200 out of 200
>
> Best,
> Pat
>
>
>
> On Sun, May 31, 2020 at 3:15 PM Terry Kim <yumin...@gmail.com> wrote:
>
>> You can use bucketBy to avoid shuffling in your scenario. This test suite
>> has some examples:
>> https://github.com/apache/spark/blob/45cf5e99503b00a6bd83ea94d6d92761db1a00ab/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L343
>>
>> Thanks,
>> Terry
>>
>> On Sun, May 31, 2020 at 7:43 AM Patrick Woody <patrick.woo...@gmail.com>
>> wrote:
>>
>>> Hey all,
>>>
>>> I have one large table, A, and two medium sized tables, B & C, that I'm
>>> trying to complete a join on efficiently. The result is multiplicative on A
>>> join B, so I'd like to avoid shuffling that result. For this example, let's
>>> just assume each table has three columns, x, y, z. The below is all being
>>> tested on Spark 2.4.5 locally.
>>>
>>> I'd like to perform the following join:
>>> A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
>>> This outputs the following physical plan:
>>> == Physical Plan ==
>>> *(6) Project [x#32, z#34, y#33, z#74, y#53]
>>> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>>>    :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>>>    :  +- Exchange hashpartitioning(x#32, z#34, 200)
>>>    :     +- *(3) Project [x#32, y#33, z#34, z#74]
>>>    :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>>>    :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS
>>> FIRST], false, 0
>>>    :           :  +- Exchange hashpartitioning(x#32, y#33, 200)
>>>    :           :     +- LocalTableScan [x#32, y#33, z#34]
>>>    :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS
>>> FIRST], false, 0
>>>    :              +- Exchange hashpartitioning(x#72, y#73, 200)
>>>    :                 +- LocalTableScan [x#72, y#73, z#74]
>>>    +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>>>       +- Exchange hashpartitioning(x#52, z#54, 200)
>>>          +- LocalTableScan [x#52, y#53, z#54]
>>>
>>>
>>> I may be misremembering, but in the past I thought you had the ability
>>> to pre-partition each table by "x" and it would satisfy the requirements of
>>> the join since it is already clustered by the key on both sides using the
>>> same hash function (this assumes numPartitions lines up obviously). However
>>> it seems like it will insert another exchange:
>>>
>>> A.repartition($"x").join(B.repartition($"x"), Seq("x",
>>> "y")).join(C.repartition($"x"), Seq("x", "z"))
>>> *(6) Project [x#32, z#34, y#33, z#74, y#53]
>>> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>>>    :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>>>    :  +- Exchange hashpartitioning(x#32, z#34, 200)
>>>    :     +- *(3) Project [x#32, y#33, z#34, z#74]
>>>    :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>>>    :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS
>>> FIRST], false, 0
>>>    :           :  +- Exchange hashpartitioning(x#32, y#33, 200)
>>>    :           :     +- Exchange hashpartitioning(x#32, 200)
>>>    :           :        +- LocalTableScan [x#32, y#33, z#34]
>>>    :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS
>>> FIRST], false, 0
>>>    :              +- Exchange hashpartitioning(x#72, y#73, 200)
>>>    :                 +- Exchange hashpartitioning(x#72, 200)
>>>    :                    +- LocalTableScan [x#72, y#73, z#74]
>>>    +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>>>       +- Exchange hashpartitioning(x#52, z#54, 200)
>>>          +- ReusedExchange [x#52, y#53, z#54], Exchange
>>> hashpartitioning(x#32, 200).
>>>
>>> Note, that using this "strategy" with groupBy("x", "y") works fine
>>> though I assume that is because it doesn't have to consider the other side
>>> of the join.
>>>
>>> Did this used to work or am I simply confusing it with groupBy? Either
>>> way - any thoughts on how I can avoid shuffling the bulk of the join result?
>>>
>>> Thanks,
>>> Pat
>>>
>>>
>>>
>>>
>>>

Reply via email to