GitHub user davies opened a pull request:

    https://github.com/apache/spark/pull/14545

    [SPARK-11150] [SQL] Dynamic Partition Pruning

    ## What changes were proposed in this pull request?
    
    This PR introduces a new feature for Spark SQL: dynamic partition pruning, 
which could be used to prune the unneeded partitions when the partition key is 
used as joining key in inner/semi join.
    
    This feature is implemented by using IN subquery, it works like this:
    
    1. Has a optimizer rule to insert an IN subquery as predicate for inner 
join on partitioned column. The predicate will be pushed down to table scan.
    2. Find out duplicated subquery, and re-use them
    3. Ran uncorrelated IN subquery in prepare(), put the result into 
expressions (same as uncorrelated scalar subquery), these results of subquery 
will be used by FileSourceScanExec to prune unneeded partitions.
    
    ## How was this patch tested?
    
    Added unit test. Manually tested this with original TP-CDS queries with 
partitioned tables, this PR could improve the performance by 20% to 2800%.
    
    ![speed 
up](https://cloud.githubusercontent.com/assets/40902/16285137/5df31d88-3889-11e6-9b08-1e04b5ea060e.png)
    
    For Query 55:
    ```sql
    select i_brand_id brand_id, i_brand brand,
        sum(ss_ext_sales_price) ext_price
     from date_dim, store_sales, item
     where d_date_sk = ss_sold_date_sk
        and ss_item_sk = i_item_sk
        and i_manager_id=28
        and d_moy=11
        and d_year=1999
     group by i_brand, i_brand_id
     order by ext_price desc, brand_id
     limit 100
    ```
    The plan looks like
    
    ```
    == Parsed Logical Plan ==
    'GlobalLimit 100
    +- 'LocalLimit 100
       +- 'Sort ['ext_price DESC, 'brand_id ASC], true
          +- 'Aggregate ['i_brand, 'i_brand_id], ['i_brand_id AS brand_id#911, 
'i_brand AS brand#912, 'sum('ss_ext_sales_price) AS ext_price#913]
             +- 'Filter (((('d_date_sk = 'ss_sold_date_sk) && ('ss_item_sk = 
'i_item_sk)) && ('i_manager_id = 28)) && (('d_moy = 11) && ('d_year = 1999)))
                +- 'Join Inner
                   :- 'Join Inner
                   :  :- 'UnresolvedRelation `date_dim`
                   :  +- 'UnresolvedRelation `store_sales`
                   +- 'UnresolvedRelation `item`
    
    == Analyzed Logical Plan ==
    brand_id: int, brand: string, ext_price: decimal(17,2)
    GlobalLimit 100
    +- LocalLimit 100
       +- Sort [ext_price#913 DESC, brand_id#911 ASC], true
          +- Aggregate [i_brand#392, i_brand_id#391], [i_brand_id#391 AS 
brand_id#911, i_brand#392 AS brand#912, sum(ss_ext_sales_price#608) AS 
ext_price#913]
             +- Filter ((((d_date_sk#296 = ss_sold_date_sk#616) && 
(ss_item_sk#595 = i_item_sk#384)) && (i_manager_id#404 = 28)) && ((d_moy#304 = 
11) && (d_year#302 = 1999)))
                +- Join Inner
                   :- Join Inner
                   :  :- SubqueryAlias date_dim
                   :  :  +- 
Relation[d_date_sk#296,d_date_id#297,d_date#298,d_month_seq#299,d_week_seq#300,d_quarter_seq#301,d_year#302,d_dow#303,d_moy#304,d_dom#305,d_qoy#306,d_fy_year#307,d_fy_quarter_seq#308,d_fy_week_seq#309,d_day_name#310,d_quarter_name#311,d_holiday#312,d_weekend#313,d_following_holiday#314,d_first_dom#315,d_last_dom#316,d_same_day_ly#317,d_same_day_lq#318,d_current_day#319,...
 4 more fields] parquet
                   :  +- SubqueryAlias store_sales
                   :     +- 
Relation[ss_sold_time_sk#594,ss_item_sk#595,ss_customer_sk#596,ss_cdemo_sk#597,ss_hdemo_sk#598,ss_addr_sk#599,ss_store_sk#600,ss_promo_sk#601,ss_ticket_number#602,ss_quantity#603,ss_wholesale_cost#604,ss_list_price#605,ss_sales_price#606,ss_ext_discount_amt#607,ss_ext_sales_price#608,ss_ext_wholesale_cost#609,ss_ext_list_price#610,ss_ext_tax#611,ss_coupon_amt#612,ss_net_paid#613,ss_net_paid_inc_tax#614,ss_net_profit#615,ss_sold_date_sk#616]
 parquet
                   +- SubqueryAlias item
                      +- 
Relation[i_item_sk#384,i_item_id#385,i_rec_start_date#386,i_rec_end_date#387,i_item_desc#388,i_current_price#389,i_wholesale_cost#390,i_brand_id#391,i_brand#392,i_class_id#393,i_class#394,i_category_id#395,i_category#396,i_manufact_id#397,i_manufact#398,i_size#399,i_formulation#400,i_color#401,i_units#402,i_container#403,i_manager_id#404,i_product_name#405]
 parquet
    
    == Optimized Logical Plan ==
    GlobalLimit 100
    +- LocalLimit 100
       +- Sort [ext_price#913 DESC, brand_id#911 ASC], true
          +- Aggregate [i_brand#392, i_brand_id#391], [i_brand_id#391 AS 
brand_id#911, i_brand#392 AS brand#912, 
MakeDecimal(sum(UnscaledValue(ss_ext_sales_price#608)),17,2) AS ext_price#913]
             +- Project [ss_ext_sales_price#608, i_brand_id#391, i_brand#392]
                +- Join Inner, (ss_item_sk#595 = i_item_sk#384)
                   :- Project [ss_item_sk#595, ss_ext_sales_price#608]
                   :  +- Join Inner, (d_date_sk#296 = ss_sold_date_sk#616)
                   :     :- Project [d_date_sk#296]
                   :     :  +- Filter ((((isnotnull(d_date_sk#296) && 
isnotnull(d_moy#304)) && isnotnull(d_year#302)) && (d_moy#304 = 11)) && 
(d_year#302 = 1999))
                   :     :     +- 
Relation[d_date_sk#296,d_date_id#297,d_date#298,d_month_seq#299,d_week_seq#300,d_quarter_seq#301,d_year#302,d_dow#303,d_moy#304,d_dom#305,d_qoy#306,d_fy_year#307,d_fy_quarter_seq#308,d_fy_week_seq#309,d_day_name#310,d_quarter_name#311,d_holiday#312,d_weekend#313,d_following_holiday#314,d_first_dom#315,d_last_dom#316,d_same_day_ly#317,d_same_day_lq#318,d_current_day#319,...
 4 more fields] parquet
                   :     +- Project [ss_item_sk#595, ss_ext_sales_price#608, 
ss_sold_date_sk#616]
                   :        +- Filter (predicate-subquery#924 
[ss_sold_date_sk#616] && (isnotnull(ss_item_sk#595) && 
isnotnull(ss_sold_date_sk#616)))
                   :           :  +- SubqueryAlias predicate-subquery#924 
[ss_sold_date_sk#616]
                   :           :     +- Aggregate [d_date_sk#296], 
[d_date_sk#296]
                   :           :        +- Project [d_date_sk#296]
                   :           :           +- Filter 
((((isnotnull(d_date_sk#296) && isnotnull(d_moy#304)) && isnotnull(d_year#302)) 
&& (d_moy#304 = 11)) && (d_year#302 = 1999))
                   :           :              +- 
Relation[d_date_sk#296,d_date_id#297,d_date#298,d_month_seq#299,d_week_seq#300,d_quarter_seq#301,d_year#302,d_dow#303,d_moy#304,d_dom#305,d_qoy#306,d_fy_year#307,d_fy_quarter_seq#308,d_fy_week_seq#309,d_day_name#310,d_quarter_name#311,d_holiday#312,d_weekend#313,d_following_holiday#314,d_first_dom#315,d_last_dom#316,d_same_day_ly#317,d_same_day_lq#318,d_current_day#319,...
 4 more fields] parquet
                   :           +- 
Relation[ss_sold_time_sk#594,ss_item_sk#595,ss_customer_sk#596,ss_cdemo_sk#597,ss_hdemo_sk#598,ss_addr_sk#599,ss_store_sk#600,ss_promo_sk#601,ss_ticket_number#602,ss_quantity#603,ss_wholesale_cost#604,ss_list_price#605,ss_sales_price#606,ss_ext_discount_amt#607,ss_ext_sales_price#608,ss_ext_wholesale_cost#609,ss_ext_list_price#610,ss_ext_tax#611,ss_coupon_amt#612,ss_net_paid#613,ss_net_paid_inc_tax#614,ss_net_profit#615,ss_sold_date_sk#616]
 parquet
                   +- Project [i_item_sk#384, i_brand_id#391, i_brand#392]
                      +- Filter ((isnotnull(i_item_sk#384) && 
isnotnull(i_manager_id#404)) && (i_manager_id#404 = 28))
                         +- 
Relation[i_item_sk#384,i_item_id#385,i_rec_start_date#386,i_rec_end_date#387,i_item_desc#388,i_current_price#389,i_wholesale_cost#390,i_brand_id#391,i_brand#392,i_class_id#393,i_class#394,i_category_id#395,i_category#396,i_manufact_id#397,i_manufact#398,i_size#399,i_formulation#400,i_color#401,i_units#402,i_container#403,i_manager_id#404,i_product_name#405]
 parquet
    
    == Physical Plan ==
    TakeOrderedAndProject(limit=100, orderBy=[ext_price#913 DESC,brand_id#911 
ASC], output=[brand_id#911,brand#912,ext_price#913])
    +- *HashAggregate(key=[i_brand#392,i_brand_id#391], 
functions=[sum(UnscaledValue(ss_ext_sales_price#608))], 
output=[brand_id#911,brand#912,ext_price#913])
       +- Exchange hashpartitioning(i_brand#392, i_brand_id#391, 4)
          +- *HashAggregate(key=[i_brand#392,i_brand_id#391], 
functions=[partial_sum(UnscaledValue(ss_ext_sales_price#608))], 
output=[i_brand#392,i_brand_id#391,sum#926L])
             +- *Project [ss_ext_sales_price#608, i_brand_id#391, i_brand#392]
                +- *BroadcastHashJoin [ss_item_sk#595], [i_item_sk#384], Inner, 
BuildRight
                   :- *Project [ss_item_sk#595, ss_ext_sales_price#608]
                   :  +- *BroadcastHashJoin [d_date_sk#296], 
[ss_sold_date_sk#616], Inner, BuildLeft
                   :     :- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
                   :     :  +- *Project [d_date_sk#296]
                   :     :     +- *Filter ((((isnotnull(d_date_sk#296) && 
isnotnull(d_moy#304)) && isnotnull(d_year#302)) && (d_moy#304 = 11)) && 
(d_year#302 = 1999))
                   :     :        +- *BatchedScan parquet 
[d_date_sk#296,d_year#302,d_moy#304] Format: ParquetFormat, InputPaths: 
file:/opt/tpcds-40/date_dim, PushedFilters: [IsNotNull(d_date_sk), 
IsNotNull(d_moy), IsNotNull(d_year), EqualTo(d_moy,11), EqualTo(d_year,1999)], 
ReadSchema: struct<d_date_sk:int,d_year:int,d_moy:int>
                   :     +- *Project [ss_item_sk#595, ss_ext_sales_price#608, 
ss_sold_date_sk#616]
                   :        +- *Filter isnotnull(ss_item_sk#595)
                   :           +- *BatchedScan parquet 
[ss_item_sk#595,ss_ext_sales_price#608,ss_sold_date_sk#616] Format: 
ParquetFormat, InputPaths: file:/opt/tpcds-40/store_sales, PushedFilters: 
[IsNotNull(ss_item_sk)], ReadSchema: 
struct<ss_item_sk:int,ss_ext_sales_price:decimal(7,2)>
                   :              :  +- Subquery subquery924
                   :              :     +- *HashAggregate(key=[d_date_sk#296], 
functions=[], output=[d_date_sk#296])
                   :              :        +- Exchange 
hashpartitioning(d_date_sk#296, 4)
                   :              :           +- 
*HashAggregate(key=[d_date_sk#296], functions=[], output=[d_date_sk#296])
                   :              :              +- *Project [d_date_sk#296]
                   :              :                 +- *Filter 
((((isnotnull(d_date_sk#296) && isnotnull(d_moy#304)) && isnotnull(d_year#302)) 
&& (d_moy#304 = 11)) && (d_year#302 = 1999))
                   :              :                    +- *BatchedScan parquet 
[d_date_sk#296,d_year#302,d_moy#304] Format: ParquetFormat, InputPaths: 
file:/opt/tpcds-40/date_dim, PushedFilters: [IsNotNull(d_date_sk), 
IsNotNull(d_moy), IsNotNull(d_year), EqualTo(d_moy,11), EqualTo(d_year,1999)], 
ReadSchema: struct<d_date_sk:int,d_year:int,d_moy:int>
                   +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
                      +- *Project [i_item_sk#384, i_brand_id#391, i_brand#392]
                         +- *Filter ((isnotnull(i_item_sk#384) && 
isnotnull(i_manager_id#404)) && (i_manager_id#404 = 28))
                            +- *BatchedScan parquet 
[i_item_sk#384,i_brand_id#391,i_brand#392,i_manager_id#404] Format: 
ParquetFormat, InputPaths: file:/opt/tpcds-40/item, PushedFilters: 
[IsNotNull(i_item_sk), IsNotNull(i_manager_id), EqualTo(i_manager_id,28)], 
ReadSchema: struct<i_item_sk:int,i_brand_id:int,i_brand:string,i_manager_id:int>
    ```


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davies/spark pruning3

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/14545.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #14545
    
----
commit dcd5a864c1df14bd56ec5795abedca478f603e86
Author: Davies Liu <[email protected]>
Date:   2016-08-08T20:50:15Z

    dynamic partition pruning

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to