GitHub user yucai opened a pull request: https://github.com/apache/spark/pull/21391
[SPARK-24343][SQL] Avoid shuffle for the bucketed table when shuffle.⦠## What changes were proposed in this pull request? When shuffle.partition > bucket number, Spark needs to shuffle the bucket table as per the shuffle.partition, this PR tries to avoid this shuffle. See below example: ``` CREATE TABLE dev USING PARQUET AS SELECT ws_item_sk, i_item_sk FROM web_sales_bucketed JOIN item ON ws_item_sk = i_item_sk; ``` Suppose web_sales_bucketed is bucketed into 4 buckets and set shuffle.partition=10. Currently, both tables are shuffled into 10 partitions. ``` Execute CreateDataSourceTableAsSelectCommand CreateDataSourceTableAsSelectCommand `dev`, ErrorIfExists, [ws_item_sk#2, i_item_sk#6] +- *(5) SortMergeJoin [ws_item_sk#2], [i_item_sk#6], Inner :- *(2) Sort [ws_item_sk#2 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ws_item_sk#2, 10) : +- *(1) Project [ws_item_sk#2] : +- *(1) Filter isnotnull(ws_item_sk#2) : +- *(1) FileScan parquet tpcds10.web_sales_bucketed[ws_item_sk#2] Batched: true, Format: Parquet, Location:... +- *(4) Sort [i_item_sk#6 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i_item_sk#6, 10) +- *(3) Project [i_item_sk#6] +- *(3) Filter isnotnull(i_item_sk#6) +- *(3) FileScan parquet tpcds10.item[i_item_sk#6] Batched: true, Format: Parquet, Location:... ``` After this patch, no shuffle for the buckted table and item table is shuffled with web_sales_bucketed's bucket number(4 partitions). ``` Execute CreateDataSourceTableAsSelectCommand CreateDataSourceTableAsSelectCommand `dev`, ErrorIfExists, [ws_item_sk#2, i_item_sk#6] +- *(4) SortMergeJoin [ws_item_sk#2], [i_item_sk#6], Inner :- *(1) Sort [ws_item_sk#2 ASC NULLS FIRST], false, 0 : +- *(1) Project [ws_item_sk#2] : +- *(1) Filter isnotnull(ws_item_sk#2) : +- *(1) FileScan parquet tpcds10.web_sales_bucketed[ws_item_sk#2] Batched: true, Format: Parquet, Location:... +- *(3) Sort [i_item_sk#6 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i_item_sk#6, 4) +- *(2) Project [i_item_sk#6] +- *(2) Filter isnotnull(i_item_sk#6) +- *(2) FileScan parquet tpcds10.item[i_item_sk#6] Batched: true, Format: Parquet, Location:... ``` This problem could be worse if we enable the adaptive execution, it usually perfers a big shuffle.parititon. ## How was this patch tested? Manually test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark avoid_shuffle Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21391.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21391 ---- commit 21a31c83ed42d45750aafae738a2ae945ea18844 Author: yucai <yyu1@...> Date: 2018-05-22T04:20:53Z [SPARK-24343][SQL] Avoid shuffle for the bucketed table when shuffle.partition > bucket number ---- --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org