wangyum opened a new pull request #29624:
URL: https://github.com/apache/spark/pull/29624
This backports #29612 to branch-3.0. Original PR description:
### What changes were proposed in this pull request?
Bucket join should work if `spark.sql.shuffle.partitions` larger than bucket
number, such as:
```scala
spark.range(1000).write.bucketBy(432, "id").saveAsTable("t1")
spark.range(1000).write.bucketBy(34, "id").saveAsTable("t2")
sql("set spark.sql.shuffle.partitions=600")
sql("set spark.sql.autoBroadcastJoinThreshold=-1")
sql("select * from t1 join t2 on t1.id = t2.id").explain()
```
Before this pr:
```
== Physical Plan ==
*(5) SortMergeJoin [id#26L], [id#27L], Inner
:- *(2) Sort [id#26L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#26L, 600), true
: +- *(1) Filter isnotnull(id#26L)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t1[id#26L] Batched: true,
DataFilters: [isnotnull(id#26L)], Format: Parquet, PartitionFilters: [],
PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>,
SelectedBucketsCount: 432 out of 432
+- *(4) Sort [id#27L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#27L, 600), true
+- *(3) Filter isnotnull(id#27L)
+- *(3) ColumnarToRow
+- FileScan parquet default.t2[id#27L] Batched: true,
DataFilters: [isnotnull(id#27L)], Format: Parquet, PartitionFilters: [],
PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>,
SelectedBucketsCount: 34 out of 34
```
After this pr:
```
== Physical Plan ==
*(4) SortMergeJoin [id#26L], [id#27L], Inner
:- *(1) Sort [id#26L ASC NULLS FIRST], false, 0
: +- *(1) Filter isnotnull(id#26L)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t1[id#26L] Batched: true, DataFilters:
[isnotnull(id#26L)], Format: Parquet, PartitionFilters: [], PushedFilters:
[IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 432 out
of 432
+- *(3) Sort [id#27L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#27L, 432), true
+- *(2) Filter isnotnull(id#27L)
+- *(2) ColumnarToRow
+- FileScan parquet default.t2[id#27L] Batched: true,
DataFilters: [isnotnull(id#27L)], Format: Parquet, PartitionFilters: [],
PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>,
SelectedBucketsCount: 34 out of 34
```
### Why are the changes needed?
Spark 2.4 support this.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]