Re: Using existing distribution for join when subset of keys

2020-05-31 Thread Terry Kim
Is the following what you trying to do?

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("x", "y")
val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("x", "y")
df1.write.format("parquet").bucketBy(8, "x", "y").saveAsTable("t1")
df2.write.format("parquet").bucketBy(8, "x", "y").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val joined = t1.join(t2, Seq("x", "y"))
joined.explain

I see no exchange:

== Physical Plan ==
*(3) Project [x#342, y#343]
+- *(3) SortMergeJoin [x#342, y#343], [x#346, y#347], Inner
   :- *(1) Sort [x#342 ASC NULLS FIRST, y#343 ASC NULLS FIRST], false, 0
   :  +- *(1) Project [x#342, y#343]
   : +- *(1) Filter (isnotnull(x#342) && isnotnull(y#343))
   :+- *(1) FileScan parquet default.t1[x#342,y#343] Batched: true,
Format: Parquet, Location: InMemoryFileIndex[file:/], PartitionFilters: [],
PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema:
struct, SelectedBucketsCount: 8 out of 8
   +- *(2) Sort [x#346 ASC NULLS FIRST, y#347 ASC NULLS FIRST], false, 0
  +- *(2) Project [x#346, y#347]
 +- *(2) Filter (isnotnull(x#346) && isnotnull(y#347))
+- *(2) FileScan parquet default.t2[x#346,y#347] Batched: true,
Format: Parquet, Location: InMemoryFileIndex[file:/], PartitionFilters: [],
PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema:
struct, SelectedBucketsCount: 8 out of 8

On Sun, May 31, 2020 at 2:38 PM Patrick Woody 
wrote:

> Hey Terry,
>
> Thanks for the response! I'm not sure that it ends up working though - the
> bucketing still seems to require the exchange before the join. Both tables
> below are saved bucketed by "x":
> *(5) Project [x#29, y#30, z#31, z#37]
> +- *(5) SortMergeJoin [x#29, y#30], [x#35, y#36], Inner
>:- *(2) Sort [x#29 ASC NULLS FIRST, y#30 ASC NULLS FIRST], false, 0
> *   :  +- Exchange hashpartitioning(x#29, y#30, 200)*
>: +- *(1) Project [x#29, y#30, z#31]
>:+- *(1) Filter (isnotnull(x#29) && isnotnull(y#30))
>:   +- *(1) FileScan parquet default.ax[x#29,y#30,z#31]
> Batched: true, Format: Parquet, Location:
> InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/ax],
> PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)],
> ReadSchema: struct, SelectedBucketsCount: 200 out of 200
>+- *(4) Sort [x#35 ASC NULLS FIRST, y#36 ASC NULLS FIRST], false, 0
> *  +- Exchange hashpartitioning(x#35, y#36, 200)*
>  +- *(3) Project [x#35, y#36, z#37]
> +- *(3) Filter (isnotnull(x#35) && isnotnull(y#36))
>+- *(3) FileScan parquet default.bx[x#35,y#36,z#37]
> Batched: true, Format: Parquet, Location:
> InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/bx],
> PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)],
> ReadSchema: struct, SelectedBucketsCount: 200 out of 200
>
> Best,
> Pat
>
>
>
> On Sun, May 31, 2020 at 3:15 PM Terry Kim  wrote:
>
>> You can use bucketBy to avoid shuffling in your scenario. This test suite
>> has some examples:
>> https://github.com/apache/spark/blob/45cf5e99503b00a6bd83ea94d6d92761db1a00ab/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L343
>>
>> Thanks,
>> Terry
>>
>> On Sun, May 31, 2020 at 7:43 AM Patrick Woody 
>> wrote:
>>
>>> Hey all,
>>>
>>> I have one large table, A, and two medium sized tables, B & C, that I'm
>>> trying to complete a join on efficiently. The result is multiplicative on A
>>> join B, so I'd like to avoid shuffling that result. For this example, let's
>>> just assume each table has three columns, x, y, z. The below is all being
>>> tested on Spark 2.4.5 locally.
>>>
>>> I'd like to perform the following join:
>>> A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
>>> This outputs the following physical plan:
>>> == Physical Plan ==
>>> *(6) Project [x#32, z#34, y#33, z#74, y#53]
>>> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>>>:- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>>>:  +- Exchange hashpartitioning(x#32, z#34, 200)
>>>: +- *(3) Project [x#32, y#33, z#34, z#74]
>>>:+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>>>:   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS
>>> FIRST], false, 0
>>>:   :  +- Exchange hashpartitioning(x#32, y#33, 200)
>>>:   : +- LocalTableScan [x#32, y#33, z#34]
>>>:   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS
>>> FIRST], false, 0
>>>:  +- Exchange hashpartitioning(x#72, y#73, 200)
>>>: +- LocalTableScan [x#72, y#73, z#74]
>>>+- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>>>   +- Exchange hashpartitioning(x#52, z#54, 200)
>>>  +- LocalTableScan [x#52, y#53, z#54]
>>>
>>>
>>> I may be misremembering, but in the past I thought you had the ability
>>> to 

Re: Using existing distribution for join when subset of keys

2020-05-31 Thread Patrick Woody
Hey Terry,

Thanks for the response! I'm not sure that it ends up working though - the
bucketing still seems to require the exchange before the join. Both tables
below are saved bucketed by "x":
*(5) Project [x#29, y#30, z#31, z#37]
+- *(5) SortMergeJoin [x#29, y#30], [x#35, y#36], Inner
   :- *(2) Sort [x#29 ASC NULLS FIRST, y#30 ASC NULLS FIRST], false, 0
*   :  +- Exchange hashpartitioning(x#29, y#30, 200)*
   : +- *(1) Project [x#29, y#30, z#31]
   :+- *(1) Filter (isnotnull(x#29) && isnotnull(y#30))
   :   +- *(1) FileScan parquet default.ax[x#29,y#30,z#31] Batched:
true, Format: Parquet, Location:
InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/ax],
PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)],
ReadSchema: struct, SelectedBucketsCount: 200 out of 200
   +- *(4) Sort [x#35 ASC NULLS FIRST, y#36 ASC NULLS FIRST], false, 0
*  +- Exchange hashpartitioning(x#35, y#36, 200)*
 +- *(3) Project [x#35, y#36, z#37]
+- *(3) Filter (isnotnull(x#35) && isnotnull(y#36))
   +- *(3) FileScan parquet default.bx[x#35,y#36,z#37] Batched:
true, Format: Parquet, Location:
InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/bx],
PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)],
ReadSchema: struct, SelectedBucketsCount: 200 out of 200

Best,
Pat



On Sun, May 31, 2020 at 3:15 PM Terry Kim  wrote:

> You can use bucketBy to avoid shuffling in your scenario. This test suite
> has some examples:
> https://github.com/apache/spark/blob/45cf5e99503b00a6bd83ea94d6d92761db1a00ab/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L343
>
> Thanks,
> Terry
>
> On Sun, May 31, 2020 at 7:43 AM Patrick Woody 
> wrote:
>
>> Hey all,
>>
>> I have one large table, A, and two medium sized tables, B & C, that I'm
>> trying to complete a join on efficiently. The result is multiplicative on A
>> join B, so I'd like to avoid shuffling that result. For this example, let's
>> just assume each table has three columns, x, y, z. The below is all being
>> tested on Spark 2.4.5 locally.
>>
>> I'd like to perform the following join:
>> A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
>> This outputs the following physical plan:
>> == Physical Plan ==
>> *(6) Project [x#32, z#34, y#33, z#74, y#53]
>> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>>:- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>>:  +- Exchange hashpartitioning(x#32, z#34, 200)
>>: +- *(3) Project [x#32, y#33, z#34, z#74]
>>:+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>>:   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
>> false, 0
>>:   :  +- Exchange hashpartitioning(x#32, y#33, 200)
>>:   : +- LocalTableScan [x#32, y#33, z#34]
>>:   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
>> false, 0
>>:  +- Exchange hashpartitioning(x#72, y#73, 200)
>>: +- LocalTableScan [x#72, y#73, z#74]
>>+- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>>   +- Exchange hashpartitioning(x#52, z#54, 200)
>>  +- LocalTableScan [x#52, y#53, z#54]
>>
>>
>> I may be misremembering, but in the past I thought you had the ability to
>> pre-partition each table by "x" and it would satisfy the requirements of
>> the join since it is already clustered by the key on both sides using the
>> same hash function (this assumes numPartitions lines up obviously). However
>> it seems like it will insert another exchange:
>>
>> A.repartition($"x").join(B.repartition($"x"), Seq("x",
>> "y")).join(C.repartition($"x"), Seq("x", "z"))
>> *(6) Project [x#32, z#34, y#33, z#74, y#53]
>> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>>:- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>>:  +- Exchange hashpartitioning(x#32, z#34, 200)
>>: +- *(3) Project [x#32, y#33, z#34, z#74]
>>:+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>>:   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
>> false, 0
>>:   :  +- Exchange hashpartitioning(x#32, y#33, 200)
>>:   : +- Exchange hashpartitioning(x#32, 200)
>>:   :+- LocalTableScan [x#32, y#33, z#34]
>>:   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
>> false, 0
>>:  +- Exchange hashpartitioning(x#72, y#73, 200)
>>: +- Exchange hashpartitioning(x#72, 200)
>>:+- LocalTableScan [x#72, y#73, z#74]
>>+- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>>   +- Exchange hashpartitioning(x#52, z#54, 200)
>>  +- ReusedExchange [x#52, y#53, z#54], Exchange
>> hashpartitioning(x#32, 200).
>>
>> Note, that using this "strategy" with groupBy("x", "y") works fine though

Re: Using existing distribution for join when subset of keys

2020-05-31 Thread Terry Kim
You can use bucketBy to avoid shuffling in your scenario. This test suite
has some examples:
https://github.com/apache/spark/blob/45cf5e99503b00a6bd83ea94d6d92761db1a00ab/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L343

Thanks,
Terry

On Sun, May 31, 2020 at 7:43 AM Patrick Woody 
wrote:

> Hey all,
>
> I have one large table, A, and two medium sized tables, B & C, that I'm
> trying to complete a join on efficiently. The result is multiplicative on A
> join B, so I'd like to avoid shuffling that result. For this example, let's
> just assume each table has three columns, x, y, z. The below is all being
> tested on Spark 2.4.5 locally.
>
> I'd like to perform the following join:
> A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
> This outputs the following physical plan:
> == Physical Plan ==
> *(6) Project [x#32, z#34, y#33, z#74, y#53]
> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>:- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(x#32, z#34, 200)
>: +- *(3) Project [x#32, y#33, z#34, z#74]
>:+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>:   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
> false, 0
>:   :  +- Exchange hashpartitioning(x#32, y#33, 200)
>:   : +- LocalTableScan [x#32, y#33, z#34]
>:   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
> false, 0
>:  +- Exchange hashpartitioning(x#72, y#73, 200)
>: +- LocalTableScan [x#72, y#73, z#74]
>+- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(x#52, z#54, 200)
>  +- LocalTableScan [x#52, y#53, z#54]
>
>
> I may be misremembering, but in the past I thought you had the ability to
> pre-partition each table by "x" and it would satisfy the requirements of
> the join since it is already clustered by the key on both sides using the
> same hash function (this assumes numPartitions lines up obviously). However
> it seems like it will insert another exchange:
>
> A.repartition($"x").join(B.repartition($"x"), Seq("x",
> "y")).join(C.repartition($"x"), Seq("x", "z"))
> *(6) Project [x#32, z#34, y#33, z#74, y#53]
> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>:- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(x#32, z#34, 200)
>: +- *(3) Project [x#32, y#33, z#34, z#74]
>:+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>:   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
> false, 0
>:   :  +- Exchange hashpartitioning(x#32, y#33, 200)
>:   : +- Exchange hashpartitioning(x#32, 200)
>:   :+- LocalTableScan [x#32, y#33, z#34]
>:   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
> false, 0
>:  +- Exchange hashpartitioning(x#72, y#73, 200)
>: +- Exchange hashpartitioning(x#72, 200)
>:+- LocalTableScan [x#72, y#73, z#74]
>+- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(x#52, z#54, 200)
>  +- ReusedExchange [x#52, y#53, z#54], Exchange
> hashpartitioning(x#32, 200).
>
> Note, that using this "strategy" with groupBy("x", "y") works fine though
> I assume that is because it doesn't have to consider the other side of the
> join.
>
> Did this used to work or am I simply confusing it with groupBy? Either way
> - any thoughts on how I can avoid shuffling the bulk of the join result?
>
> Thanks,
> Pat
>
>
>
>
>


Using existing distribution for join when subset of keys

2020-05-31 Thread Patrick Woody
Hey all,

I have one large table, A, and two medium sized tables, B & C, that I'm
trying to complete a join on efficiently. The result is multiplicative on A
join B, so I'd like to avoid shuffling that result. For this example, let's
just assume each table has three columns, x, y, z. The below is all being
tested on Spark 2.4.5 locally.

I'd like to perform the following join:
A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
This outputs the following physical plan:
== Physical Plan ==
*(6) Project [x#32, z#34, y#33, z#74, y#53]
+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#32, z#34, 200)
   : +- *(3) Project [x#32, y#33, z#34, z#74]
   :+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
   :   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
false, 0
   :   :  +- Exchange hashpartitioning(x#32, y#33, 200)
   :   : +- LocalTableScan [x#32, y#33, z#34]
   :   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
false, 0
   :  +- Exchange hashpartitioning(x#72, y#73, 200)
   : +- LocalTableScan [x#72, y#73, z#74]
   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(x#52, z#54, 200)
 +- LocalTableScan [x#52, y#53, z#54]


I may be misremembering, but in the past I thought you had the ability to
pre-partition each table by "x" and it would satisfy the requirements of
the join since it is already clustered by the key on both sides using the
same hash function (this assumes numPartitions lines up obviously). However
it seems like it will insert another exchange:

A.repartition($"x").join(B.repartition($"x"), Seq("x",
"y")).join(C.repartition($"x"), Seq("x", "z"))
*(6) Project [x#32, z#34, y#33, z#74, y#53]
+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#32, z#34, 200)
   : +- *(3) Project [x#32, y#33, z#34, z#74]
   :+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
   :   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
false, 0
   :   :  +- Exchange hashpartitioning(x#32, y#33, 200)
   :   : +- Exchange hashpartitioning(x#32, 200)
   :   :+- LocalTableScan [x#32, y#33, z#34]
   :   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
false, 0
   :  +- Exchange hashpartitioning(x#72, y#73, 200)
   : +- Exchange hashpartitioning(x#72, 200)
   :+- LocalTableScan [x#72, y#73, z#74]
   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(x#52, z#54, 200)
 +- ReusedExchange [x#52, y#53, z#54], Exchange
hashpartitioning(x#32, 200).

Note, that using this "strategy" with groupBy("x", "y") works fine though I
assume that is because it doesn't have to consider the other side of the
join.

Did this used to work or am I simply confusing it with groupBy? Either way
- any thoughts on how I can avoid shuffling the bulk of the join result?

Thanks,
Pat