GitHub user yucai opened a pull request:

    https://github.com/apache/spark/pull/21391

    [SPARK-24343][SQL] Avoid shuffle for the bucketed table when shuffle.…

    ## What changes were proposed in this pull request?
    
    When shuffle.partition > bucket number, Spark needs to shuffle the bucket 
table as per the shuffle.partition, this PR tries to avoid this shuffle.
    
    See below example:
    ```
    CREATE TABLE dev
    USING PARQUET
    AS SELECT ws_item_sk, i_item_sk
    FROM web_sales_bucketed
    JOIN item ON ws_item_sk = i_item_sk;
    ```
    Suppose web_sales_bucketed is bucketed into 4 buckets and set 
shuffle.partition=10.
    Currently, both tables are shuffled into 10 partitions.
    ```
    Execute CreateDataSourceTableAsSelectCommand 
CreateDataSourceTableAsSelectCommand `dev`, ErrorIfExists, [ws_item_sk#2, 
i_item_sk#6]
    +- *(5) SortMergeJoin [ws_item_sk#2], [i_item_sk#6], Inner
       :- *(2) Sort [ws_item_sk#2 ASC NULLS FIRST], false, 0
       :  +- Exchange hashpartitioning(ws_item_sk#2, 10)
       :     +- *(1) Project [ws_item_sk#2]
       :        +- *(1) Filter isnotnull(ws_item_sk#2)
       :           +- *(1) FileScan parquet 
tpcds10.web_sales_bucketed[ws_item_sk#2] Batched: true, Format: Parquet, 
Location:...
       +- *(4) Sort [i_item_sk#6 ASC NULLS FIRST], false, 0
          +- Exchange hashpartitioning(i_item_sk#6, 10)
             +- *(3) Project [i_item_sk#6]
                +- *(3) Filter isnotnull(i_item_sk#6)
                   +- *(3) FileScan parquet tpcds10.item[i_item_sk#6] Batched: 
true, Format: Parquet, Location:...
    ```
    After this patch, no shuffle for the buckted table and item table is 
shuffled with web_sales_bucketed's bucket number(4 partitions).
    ```
    Execute CreateDataSourceTableAsSelectCommand 
CreateDataSourceTableAsSelectCommand `dev`, ErrorIfExists, [ws_item_sk#2, 
i_item_sk#6]
    +- *(4) SortMergeJoin [ws_item_sk#2], [i_item_sk#6], Inner
       :- *(1) Sort [ws_item_sk#2 ASC NULLS FIRST], false, 0
       :  +- *(1) Project [ws_item_sk#2]
       :     +- *(1) Filter isnotnull(ws_item_sk#2)
       :        +- *(1) FileScan parquet 
tpcds10.web_sales_bucketed[ws_item_sk#2] Batched: true, Format: Parquet, 
Location:...
       +- *(3) Sort [i_item_sk#6 ASC NULLS FIRST], false, 0
          +- Exchange hashpartitioning(i_item_sk#6, 4)
             +- *(2) Project [i_item_sk#6]
                +- *(2) Filter isnotnull(i_item_sk#6)
                   +- *(2) FileScan parquet tpcds10.item[i_item_sk#6] Batched: 
true, Format: Parquet, Location:...
    ```
    This problem could be worse if we enable the adaptive execution, it usually 
perfers a big shuffle.parititon.
    
    ## How was this patch tested?
    
    Manually test.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yucai/spark avoid_shuffle

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21391.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21391
    
----
commit 21a31c83ed42d45750aafae738a2ae945ea18844
Author: yucai <yyu1@...>
Date:   2018-05-22T04:20:53Z

    [SPARK-24343][SQL] Avoid shuffle for the bucketed table when 
shuffle.partition > bucket number

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to