peter-toth opened a new pull request #28885:
URL: https://github.com/apache/spark/pull/28885


   ### What changes were proposed in this pull request?
   This PR:
   - Unifies exchange and subquery reuse into one rule as the current separate 
`ReuseExchange` and `ReuseSubquery` rule can interfere. One example of the 
issue is the TPCDS Q14a query where the definition of the exchange `id=13278` 
is missing but there are 3 reuse reference to it. In the case of Q14a 
`ReuseExchange` rule inserted the reuse references correctly but then the 
`ReuseSubquery` rule altered the original `id=13278` node to `id=15038` by 
inserting `ReusedSubquery` nodes under it. This PR fixes the issue by combining 
the 2 similar rules that insert references into the plan:
   ```
   == Physical Plan ==
   TakeOrderedAndProject(limit=100, orderBy=[channel#3384 ASC NULLS 
FIRST,i_brand_id#3385 ASC NULLS FIRST,i_class_id#3386 ASC NULLS 
FIRST,i_category_id#3387 ASC NULLS FIRST], 
output=[channel#3384,i_brand_id#3385,i_class_id#3386,i_category_id#3387,sum(sales)#3374,sum(number_sales)#3375L])
   +- *(119) HashAggregate(keys=[channel#3384, i_brand_id#3385, 
i_class_id#3386, i_category_id#3387, spark_grouping_id#3383L], 
functions=[sum(sales#3318), sum(number_sales#3319L)])
      +- Exchange hashpartitioning(channel#3384, i_brand_id#3385, 
i_class_id#3386, i_category_id#3387, spark_grouping_id#3383L, 4), true, 
[id=#15231]
         +- *(118) HashAggregate(keys=[channel#3384, i_brand_id#3385, 
i_class_id#3386, i_category_id#3387, spark_grouping_id#3383L], 
functions=[partial_sum(sales#3318), partial_sum(number_sales#3319L)])
            +- *(118) Expand [List(sales#3318, number_sales#3319L, 
channel#3317, i_brand_id#499, i_class_id#501, i_category_id#503, 0), 
List(sales#3318, number_sales#3319L, channel#3317, i_brand_id#499, 
i_class_id#501, null, 1), List(sales#3318, number_sales#3319L, channel#3317, 
i_brand_id#499, null, null, 3), List(sales#3318, number_sales#3319L, 
channel#3317, null, null, null, 7), List(sales#3318, number_sales#3319L, null, 
null, null, null, 15)], [sales#3318, number_sales#3319L, channel#3384, 
i_brand_id#3385, i_class_id#3386, i_category_id#3387, spark_grouping_id#3383L]
               +- Union
                  :- *(39) Project [sales#3318, number_sales#3319L, 
channel#3317, i_brand_id#499, i_class_id#501, i_category_id#503]
                  :  +- *(39) Filter 
(isnotnull(sum(CheckOverflow((promote_precision(cast(cast(ss_quantity#1163 as 
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(ss_list_price#1165 
as decimal(12,2)))), DecimalType(18,2), true))#3353) AND 
(cast(sum(CheckOverflow((promote_precision(cast(cast(ss_quantity#1163 as 
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(ss_list_price#1165 
as decimal(12,2)))), DecimalType(18,2), true))#3353 as decimal(32,6)) > 
cast(Subquery scalar-subquery#3320, [id=#10802] as decimal(32,6))))
                  :     :  +- Subquery scalar-subquery#3320, [id=#10802]
                  :     :     +- *(8) HashAggregate(keys=[], 
functions=[avg(CheckOverflow((promote_precision(cast(cast(quantity#3335 as 
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(list_price#3336 as 
decimal(12,2)))), DecimalType(18,2), true))])
                  :     :        +- Exchange SinglePartition, true, [id=#10798]
                  :     :           +- *(7) HashAggregate(keys=[], 
functions=[partial_avg(CheckOverflow((promote_precision(cast(cast(quantity#3335 
as decimal(10,0)) as decimal(12,2))) * promote_precision(cast(list_price#3336 
as decimal(12,2)))), DecimalType(18,2), true))])
                  :     :              +- Union
                  :     :                 :- *(2) Project [ss_quantity#1163 AS 
quantity#3335, ss_list_price#1165 AS list_price#3336]
                  :     :                 :  +- *(2) BroadcastHashJoin 
[ss_sold_date_sk#1176], [d_date_sk#329], Inner, BuildRight
                  :     :                 :     :- *(2) ColumnarToRow
                  :     :                 :     :  +- FileScan parquet 
[ss_quantity#1163,ss_list_price#1165,ss_sold_date_sk#1176] Batched: true, 
DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/store_sales],
 PartitionFilters: [isnotnull(ss_sold_date_sk#1176), 
dynamicpruningexpression(ss_sold_date_sk#1176 IN dynamicpruning..., 
PushedFilters: [], ReadSchema: 
struct<ss_quantity:int,ss_list_price:decimal(7,2)>
                  :     :                 :     :        +- SubqueryBroadcast 
dynamicpruning#3413, 0, [d_date_sk#329], [id=#10730]
                  :     :                 :     :           +- ReusedExchange 
[d_date_sk#329], BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
[id=#10640]
                  :     :                 :     +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
[id=#10640]
                  :     :                 :        +- *(1) Project 
[d_date_sk#329]
                  :     :                 :           +- *(1) Filter 
(((isnotnull(d_year#335) AND (d_year#335 >= 1999)) AND (d_year#335 <= 2001)) 
AND isnotnull(d_date_sk#329))
                  :     :                 :              +- *(1) ColumnarToRow
                  :     :                 :                 +- FileScan parquet 
[d_date_sk#329,d_year#335] Batched: true, DataFilters: [isnotnull(d_year#335), 
(d_year#335 >= 1999), (d_year#335 <= 2001), isnotnull(d_date_sk#329)], Format: 
Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/date_dim],
 PartitionFilters: [], PushedFilters: [IsNotNull(d_year), 
GreaterThanOrEqual(d_year,1999), LessThanOrEqual(d_year,2001), 
IsNotNull(d_da..., ReadSchema: struct<d_date_sk:int,d_year:int>
                  :     :                 :- *(4) Project [cs_quantity#889 AS 
quantity#3337, cs_list_price#891 AS list_price#3338]
                  :     :                 :  +- *(4) BroadcastHashJoin 
[cs_sold_date_sk#905], [d_date_sk#329], Inner, BuildRight
                  :     :                 :     :- *(4) ColumnarToRow
                  :     :                 :     :  +- FileScan parquet 
[cs_quantity#889,cs_list_price#891,cs_sold_date_sk#905] Batched: true, 
DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/catalog_sales],
 PartitionFilters: [isnotnull(cs_sold_date_sk#905), 
dynamicpruningexpression(cs_sold_date_sk#905 IN dynamicpruning#3..., 
PushedFilters: [], ReadSchema: 
struct<cs_quantity:int,cs_list_price:decimal(7,2)>
                  :     :                 :     :        +- ReusedSubquery 
SubqueryBroadcast dynamicpruning#3413, 0, [d_date_sk#329], [id=#10730]
                  :     :                 :     +- ReusedExchange 
[d_date_sk#329], BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
[id=#10640]
                  :     :                 +- *(6) Project [ws_quantity#1030 AS 
quantity#3339, ws_list_price#1032 AS list_price#3340]
                  :     :                    +- *(6) BroadcastHashJoin 
[ws_sold_date_sk#1046], [d_date_sk#329], Inner, BuildRight
                  :     :                       :- *(6) ColumnarToRow
                  :     :                       :  +- FileScan parquet 
[ws_quantity#1030,ws_list_price#1032,ws_sold_date_sk#1046] Batched: true, 
DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/web_sales],
 PartitionFilters: [isnotnull(ws_sold_date_sk#1046), 
dynamicpruningexpression(ws_sold_date_sk#1046 IN dynamicpruning..., 
PushedFilters: [], ReadSchema: 
struct<ws_quantity:int,ws_list_price:decimal(7,2)>
                  :     :                       :        +- ReusedSubquery 
SubqueryBroadcast dynamicpruning#3413, 0, [d_date_sk#329], [id=#10730]
                  :     :                       +- ReusedExchange 
[d_date_sk#329], BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
[id=#10640]
                  :     +- *(39) HashAggregate(keys=[i_brand_id#499, 
i_class_id#501, i_category_id#503], 
functions=[sum(CheckOverflow((promote_precision(cast(cast(ss_quantity#1163 as 
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(ss_list_price#1165 
as decimal(12,2)))), DecimalType(18,2), true)), count(1)])
                  :        +- Exchange hashpartitioning(i_brand_id#499, 
i_class_id#501, i_category_id#503, 4), true, [id=#15050]
                  :           +- *(38) HashAggregate(keys=[i_brand_id#499, 
i_class_id#501, i_category_id#503], 
functions=[partial_sum(CheckOverflow((promote_precision(cast(cast(ss_quantity#1163
 as decimal(10,0)) as decimal(12,2))) * 
promote_precision(cast(ss_list_price#1165 as decimal(12,2)))), 
DecimalType(18,2), true)), partial_count(1)])
                  :              +- *(38) Project [ss_quantity#1163, 
ss_list_price#1165, i_brand_id#499, i_class_id#501, i_category_id#503]
                  :                 +- *(38) BroadcastHashJoin 
[ss_sold_date_sk#1176], [d_date_sk#329], Inner, BuildRight
                  :                    :- *(38) Project [ss_quantity#1163, 
ss_list_price#1165, ss_sold_date_sk#1176, i_brand_id#499, i_class_id#501, 
i_category_id#503]
                  :                    :  +- *(38) BroadcastHashJoin 
[ss_item_sk#1155], [i_item_sk#492], Inner, BuildRight
                  :                    :     :- SortMergeJoin 
[ss_item_sk#1155], [ss_item_sk#3334], LeftSemi
                  :                    :     :  :- *(2) Sort [ss_item_sk#1155 
ASC NULLS FIRST], false, 0
                  :                    :     :  :  +- Exchange 
hashpartitioning(ss_item_sk#1155, 4), true, [id=#14846]
                  :                    :     :  :     +- *(1) Project 
[ss_item_sk#1155, ss_quantity#1163, ss_list_price#1165, ss_sold_date_sk#1176]
                  :                    :     :  :        +- *(1) Filter 
isnotnull(ss_item_sk#1155)
                  :                    :     :  :           +- *(1) 
ColumnarToRow
                  :                    :     :  :              +- FileScan 
parquet 
[ss_item_sk#1155,ss_quantity#1163,ss_list_price#1165,ss_sold_date_sk#1176] 
Batched: true, DataFilters: [isnotnull(ss_item_sk#1155)], Format: Parquet, 
Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/store_sales],
 PartitionFilters: [isnotnull(ss_sold_date_sk#1176), 
dynamicpruningexpression(ss_sold_date_sk#1176 IN dynamicpruning..., 
PushedFilters: [IsNotNull(ss_item_sk)], ReadSchema: 
struct<ss_item_sk:int,ss_quantity:int,ss_list_price:decimal(7,2)>
                  :                    :     :  :                    +- 
SubqueryBroadcast dynamicpruning#3445, 0, [d_date_sk#329], [id=#14839]
                  :                    :     :  :                       +- 
ReusedExchange [d_date_sk#329], BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
[id=#12547]
                  :                    :     :  +- *(18) Sort [ss_item_sk#3334 
ASC NULLS FIRST], false, 0
                  :                    :     :     +- Exchange 
hashpartitioning(ss_item_sk#3334, 4), true, [id=#15038]
                  :                    :     :        +- *(17) Project 
[i_item_sk#492 AS ss_item_sk#3334]
                  :                    :     :           +- *(17) 
BroadcastHashJoin [i_brand_id#499, i_class_id#501, i_category_id#503], 
[brand_id#3331, class_id#3332, category_id#3333], Inner, BuildLeft
                  :                    :     :              :- 
BroadcastExchange HashedRelationBroadcastMode(List(input[1, int, true], 
input[2, int, true], input[3, int, true])), [id=#12321]
                  :                    :     :              :  +- *(3) Project 
[i_item_sk#492, i_brand_id#499, i_class_id#501, i_category_id#503]
                  :                    :     :              :     +- *(3) 
Filter ((isnotnull(i_brand_id#499) AND isnotnull(i_category_id#503)) AND 
isnotnull(i_class_id#501))
                  :                    :     :              :        +- *(3) 
ColumnarToRow
                  :                    :     :              :           +- 
FileScan parquet 
[i_item_sk#492,i_brand_id#499,i_class_id#501,i_category_id#503] Batched: true, 
DataFilters: [isnotnull(i_brand_id#499), isnotnull(i_category_id#503), 
isnotnull(i_class_id#501)], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/item], 
PartitionFilters: [], PushedFilters: [IsNotNull(i_brand_id), 
IsNotNull(i_category_id), IsNotNull(i_class_id)], ReadSchema: 
struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>
                  :                    :     :              +- *(17) 
HashAggregate(keys=[brand_id#3331, class_id#3332, category_id#3333], 
functions=[])
                  :                    :     :                 +- *(17) 
HashAggregate(keys=[brand_id#3331, class_id#3332, category_id#3333], 
functions=[])
                  :                    :     :                    +- *(17) 
HashAggregate(keys=[brand_id#3331, class_id#3332, category_id#3333], 
functions=[])
                  :                    :     :                       +- 
Exchange hashpartitioning(brand_id#3331, class_id#3332, category_id#3333, 4), 
true, [id=#15030]
                  :                    :     :                          +- 
*(16) HashAggregate(keys=[brand_id#3331, class_id#3332, category_id#3333], 
functions=[])
                  :                    :     :                             +- 
SortMergeJoin [coalesce(brand_id#3331, 0), isnull(brand_id#3331), 
coalesce(class_id#3332, 0), isnull(class_id#3332), coalesce(category_id#3333, 
0), isnull(category_id#3333)], [coalesce(i_brand_id#499, 0), 
isnull(i_brand_id#499), coalesce(i_class_id#501, 0), isnull(i_class_id#501), 
coalesce(i_category_id#503, 0), isnull(i_category_id#503)], LeftSemi
                  :                    :     :                                
:- SortMergeJoin [coalesce(brand_id#3331, 0), isnull(brand_id#3331), 
coalesce(class_id#3332, 0), isnull(class_id#3332), coalesce(category_id#3333, 
0), isnull(category_id#3333)], [coalesce(i_brand_id#499, 0), 
isnull(i_brand_id#499), coalesce(i_class_id#501, 0), isnull(i_class_id#501), 
coalesce(i_category_id#503, 0), isnull(i_category_id#503)], LeftSemi
                  :                    :     :                                : 
 :- *(7) Sort [coalesce(brand_id#3331, 0) ASC NULLS FIRST, 
isnull(brand_id#3331) ASC NULLS FIRST, coalesce(class_id#3332, 0) ASC NULLS 
FIRST, isnull(class_id#3332) ASC NULLS FIRST, coalesce(category_id#3333, 0) ASC 
NULLS FIRST, isnull(category_id#3333) ASC NULLS FIRST], false, 0
                  :                    :     :                                : 
 :  +- Exchange hashpartitioning(coalesce(brand_id#3331, 0), 
isnull(brand_id#3331), coalesce(class_id#3332, 0), isnull(class_id#3332), 
coalesce(category_id#3333, 0), isnull(category_id#3333), 4), true, [id=#14862]
                  :                    :     :                                : 
 :     +- *(6) Project [i_brand_id#499 AS brand_id#3331, i_class_id#501 AS 
class_id#3332, i_category_id#503 AS category_id#3333]
                  :                    :     :                                : 
 :        +- *(6) BroadcastHashJoin [ss_sold_date_sk#1176], [d_date_sk#329], 
Inner, BuildRight
                  :                    :     :                                : 
 :           :- *(6) Project [ss_sold_date_sk#1176, i_brand_id#499, 
i_class_id#501, i_category_id#503]
                  :                    :     :                                : 
 :           :  +- *(6) BroadcastHashJoin [ss_item_sk#1155], [i_item_sk#492], 
Inner, BuildRight
                  :                    :     :                                : 
 :           :     :- *(6) Project [ss_item_sk#1155, ss_sold_date_sk#1176]
                  :                    :     :                                : 
 :           :     :  +- *(6) Filter isnotnull(ss_item_sk#1155)
                  :                    :     :                                : 
 :           :     :     +- *(6) ColumnarToRow
                  :                    :     :                                : 
 :           :     :        +- FileScan parquet 
[ss_item_sk#1155,ss_sold_date_sk#1176] Batched: true, DataFilters: 
[isnotnull(ss_item_sk#1155)], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/store_sales],
 PartitionFilters: [isnotnull(ss_sold_date_sk#1176), 
dynamicpruningexpression(ss_sold_date_sk#1176 IN dynamicpruning..., 
PushedFilters: [IsNotNull(ss_item_sk)], ReadSchema: struct<ss_item_sk:int>
                  :                    :     :                                : 
 :           :     :              +- SubqueryBroadcast dynamicpruning#3416, 0, 
[d_date_sk#329], [id=#14851]
                  :                    :     :                                : 
 :           :     :                 +- ReusedExchange [d_date_sk#329], 
BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint))), [id=#12341]
                  :                    :     :                                : 
 :           :     +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
[id=#12332]
                  :                    :     :                                : 
 :           :        +- *(4) Project [i_item_sk#492, i_brand_id#499, 
i_class_id#501, i_category_id#503]
                  :                    :     :                                : 
 :           :           +- *(4) Filter (((isnotnull(i_item_sk#492) AND 
isnotnull(i_category_id#503)) AND isnotnull(i_class_id#501)) AND 
isnotnull(i_brand_id#499))
                  :                    :     :                                : 
 :           :              +- *(4) ColumnarToRow
                  :                    :     :                                : 
 :           :                 +- FileScan parquet 
[i_item_sk#492,i_brand_id#499,i_class_id#501,i_category_id#503] Batched: true, 
DataFilters: [isnotnull(i_item_sk#492), isnotnull(i_category_id#503), 
isnotnull(i_class_id#501), isnotnull(i_b..., Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/item], 
PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk), 
IsNotNull(i_category_id), IsNotNull(i_class_id), IsNotNull(i_brand_id)], 
ReadSchema: 
struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>
                  :                    :     :                                : 
 :           +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
[id=#12341]
                  :                    :     :                                : 
 :              +- *(5) Project [d_date_sk#329]
                  :                    :     :                                : 
 :                 +- *(5) Filter (((isnotnull(d_year#335) AND (d_year#335 >= 
1999)) AND (d_year#335 <= 2001)) AND isnotnull(d_date_sk#329))
                  :                    :     :                                : 
 :                    +- *(5) ColumnarToRow
                  :                    :     :                                : 
 :                       +- FileScan parquet [d_date_sk#329,d_year#335] 
Batched: true, DataFilters: [isnotnull(d_year#335), (d_year#335 >= 1999), 
(d_year#335 <= 2001), isnotnull(d_date_sk#329)], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/date_dim],
 PartitionFilters: [], PushedFilters: [IsNotNull(d_year), 
GreaterThanOrEqual(d_year,1999), LessThanOrEqual(d_year,2001), 
IsNotNull(d_da..., ReadSchema: struct<d_date_sk:int,d_year:int>
                  :                    :     :                                : 
 +- *(11) Sort [coalesce(i_brand_id#499, 0) ASC NULLS FIRST, 
isnull(i_brand_id#499) ASC NULLS FIRST, coalesce(i_class_id#501, 0) ASC NULLS 
FIRST, isnull(i_class_id#501) ASC NULLS FIRST, coalesce(i_category_id#503, 0) 
ASC NULLS FIRST, isnull(i_category_id#503) ASC NULLS FIRST], false, 0
                  :                    :     :                                : 
    +- Exchange hashpartitioning(coalesce(i_brand_id#499, 0), 
isnull(i_brand_id#499), coalesce(i_class_id#501, 0), isnull(i_class_id#501), 
coalesce(i_category_id#503, 0), isnull(i_category_id#503), 4), true, [id=#15005]
                  :                    :     :                                : 
       +- *(10) Project [i_brand_id#499, i_class_id#501, i_category_id#503]
                  :                    :     :                                : 
          +- *(10) BroadcastHashJoin [cs_sold_date_sk#905], [d_date_sk#329], 
Inner, BuildRight
                  :                    :     :                                : 
             :- *(10) Project [cs_sold_date_sk#905, i_brand_id#499, 
i_class_id#501, i_category_id#503]
                  :                    :     :                                : 
             :  +- *(10) BroadcastHashJoin [cs_item_sk#886], [i_item_sk#492], 
Inner, BuildRight
                  :                    :     :                                : 
             :     :- *(10) Project [cs_item_sk#886, cs_sold_date_sk#905]
                  :                    :     :                                : 
             :     :  +- *(10) Filter isnotnull(cs_item_sk#886)
                  :                    :     :                                : 
             :     :     +- *(10) ColumnarToRow
                  :                    :     :                                : 
             :     :        +- FileScan parquet 
[cs_item_sk#886,cs_sold_date_sk#905] Batched: true, DataFilters: 
[isnotnull(cs_item_sk#886)], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/catalog_sales],
 PartitionFilters: [isnotnull(cs_sold_date_sk#905), 
dynamicpruningexpression(cs_sold_date_sk#905 IN dynamicpruning#3..., 
PushedFilters: [IsNotNull(cs_item_sk)], ReadSchema: struct<cs_item_sk:int>
                  :                    :     :                                : 
             :     :              +- ReusedSubquery SubqueryBroadcast 
dynamicpruning#3416, 0, [d_date_sk#329], [id=#14851]
                  :                    :     :                                : 
             :     +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
[id=#12359]
                  :                    :     :                                : 
             :        +- *(8) Project [i_item_sk#492, i_brand_id#499, 
i_class_id#501, i_category_id#503]
                  :                    :     :                                : 
             :           +- *(8) Filter isnotnull(i_item_sk#492)
                  :                    :     :                                : 
             :              +- *(8) ColumnarToRow
                  :                    :     :                                : 
             :                 +- FileScan parquet 
[i_item_sk#492,i_brand_id#499,i_class_id#501,i_category_id#503] Batched: true, 
DataFilters: [isnotnull(i_item_sk#492)], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/item], 
PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema: 
struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>
                  :                    :     :                                : 
             +- ReusedExchange [d_date_sk#329], BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
[id=#12341]
                  :                    :     :                                
+- *(15) Sort [coalesce(i_brand_id#499, 0) ASC NULLS FIRST, 
isnull(i_brand_id#499) ASC NULLS FIRST, coalesce(i_class_id#501, 0) ASC NULLS 
FIRST, isnull(i_class_id#501) ASC NULLS FIRST, coalesce(i_category_id#503, 0) 
ASC NULLS FIRST, isnull(i_category_id#503) ASC NULLS FIRST], false, 0
                  :                    :     :                                  
 +- Exchange hashpartitioning(coalesce(i_brand_id#499, 0), 
isnull(i_brand_id#499), coalesce(i_class_id#501, 0), isnull(i_class_id#501), 
coalesce(i_category_id#503, 0), isnull(i_category_id#503), 4), true, [id=#15022]
                  :                    :     :                                  
    +- *(14) Project [i_brand_id#499, i_class_id#501, i_category_id#503]
                  :                    :     :                                  
       +- *(14) BroadcastHashJoin [ws_sold_date_sk#1046], [d_date_sk#329], 
Inner, BuildRight
                  :                    :     :                                  
          :- *(14) Project [ws_sold_date_sk#1046, i_brand_id#499, 
i_class_id#501, i_category_id#503]
                  :                    :     :                                  
          :  +- *(14) BroadcastHashJoin [ws_item_sk#1015], [i_item_sk#492], 
Inner, BuildRight
                  :                    :     :                                  
          :     :- *(14) Project [ws_item_sk#1015, ws_sold_date_sk#1046]
                  :                    :     :                                  
          :     :  +- *(14) Filter isnotnull(ws_item_sk#1015)
                  :                    :     :                                  
          :     :     +- *(14) ColumnarToRow
                  :                    :     :                                  
          :     :        +- FileScan parquet 
[ws_item_sk#1015,ws_sold_date_sk#1046] Batched: true, DataFilters: 
[isnotnull(ws_item_sk#1015)], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/web_sales],
 PartitionFilters: [isnotnull(ws_sold_date_sk#1046), 
dynamicpruningexpression(ws_sold_date_sk#1046 IN dynamicpruning..., 
PushedFilters: [IsNotNull(ws_item_sk)], ReadSchema: struct<ws_item_sk:int>
                  :                    :     :                                  
          :     :              +- ReusedSubquery SubqueryBroadcast 
dynamicpruning#3416, 0, [d_date_sk#329], [id=#14851]
                  :                    :     :                                  
          :     +- ReusedExchange [i_item_sk#492, i_brand_id#499, 
i_class_id#501, i_category_id#503], BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
[id=#12359]
                  :                    :     :                                  
          +- ReusedExchange [d_date_sk#329], BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
[id=#12341]
                  :                    :     +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
[id=#13552]
                  :                    :        +- SortMergeJoin 
[i_item_sk#492], [ss_item_sk#3334], LeftSemi
                  :                    :           :- *(20) Sort [i_item_sk#492 
ASC NULLS FIRST], false, 0
                  :                    :           :  +- Exchange 
hashpartitioning(i_item_sk#492, 4), true, [id=#12428]
                  :                    :           :     +- *(19) Project 
[i_item_sk#492, i_brand_id#499, i_class_id#501, i_category_id#503]
                  :                    :           :        +- *(19) Filter 
isnotnull(i_item_sk#492)
                  :                    :           :           +- *(19) 
ColumnarToRow
                  :                    :           :              +- FileScan 
parquet [i_item_sk#492,i_brand_id#499,i_class_id#501,i_category_id#503] 
Batched: true, DataFilters: [isnotnull(i_item_sk#492)], Format: Parquet, 
Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/item], 
PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema: 
struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>
                  :                    :           +- *(36) Sort 
[ss_item_sk#3334 ASC NULLS FIRST], false, 0
                  :                    :              +- ReusedExchange 
[ss_item_sk#3334], Exchange hashpartitioning(ss_item_sk#3334, 4), true, 
[id=#13278]
                  :                    +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
[id=#12547]
                  :                       +- *(37) Project [d_date_sk#329]
                  :                          +- *(37) Filter 
((((isnotnull(d_year#335) AND isnotnull(d_moy#337)) AND (d_year#335 = 2001)) 
AND (d_moy#337 = 11)) AND isnotnull(d_date_sk#329))
                  :                             +- *(37) ColumnarToRow
                  :                                +- FileScan parquet 
[d_date_sk#329,d_year#335,d_moy#337] Batched: true, DataFilters: 
[isnotnull(d_year#335), isnotnull(d_moy#337), (d_year#335 = 2001), (d_moy#337 = 
11), isnotnull(d_..., Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/date_dim],
 PartitionFilters: [], PushedFilters: [IsNotNull(d_year), IsNotNull(d_moy), 
EqualTo(d_year,2001), EqualTo(d_moy,11), IsNotNull(d_date_sk)], ReadSchema: 
struct<d_date_sk:int,d_year:int,d_moy:int>
                  :- *(78) Project [sales#3323, number_sales#3324L, 
channel#3322, i_brand_id#499, i_class_id#501, i_category_id#503]
                  :  +- *(78) Filter 
(isnotnull(sum(CheckOverflow((promote_precision(cast(cast(cs_quantity#889 as 
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(cs_list_price#891 as 
decimal(12,2)))), DecimalType(18,2), true))#3356) AND 
(cast(sum(CheckOverflow((promote_precision(cast(cast(cs_quantity#889 as 
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(cs_list_price#891 as 
decimal(12,2)))), DecimalType(18,2), true))#3356 as decimal(32,6)) > 
cast(ReusedSubquery Subquery scalar-subquery#3320, [id=#10802] as 
decimal(32,6))))
                  :     :  +- ReusedSubquery Subquery scalar-subquery#3320, 
[id=#10802]
                  :     +- *(78) HashAggregate(keys=[i_brand_id#499, 
i_class_id#501, i_category_id#503], 
functions=[sum(CheckOverflow((promote_precision(cast(cast(cs_quantity#889 as 
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(cs_list_price#891 as 
decimal(12,2)))), DecimalType(18,2), true)), count(1)])
                  :        +- Exchange hashpartitioning(i_brand_id#499, 
i_class_id#501, i_category_id#503, 4), true, [id=#15154]
                  :           +- *(77) HashAggregate(keys=[i_brand_id#499, 
i_class_id#501, i_category_id#503], 
functions=[partial_sum(CheckOverflow((promote_precision(cast(cast(cs_quantity#889
 as decimal(10,0)) as decimal(12,2))) * 
promote_precision(cast(cs_list_price#891 as decimal(12,2)))), 
DecimalType(18,2), true)), partial_count(1)])
                  :              +- *(77) Project [cs_quantity#889, 
cs_list_price#891, i_brand_id#499, i_class_id#501, i_category_id#503]
                  :                 +- *(77) BroadcastHashJoin 
[cs_sold_date_sk#905], [d_date_sk#329], Inner, BuildRight
                  :                    :- *(77) Project [cs_quantity#889, 
cs_list_price#891, cs_sold_date_sk#905, i_brand_id#499, i_class_id#501, 
i_category_id#503]
                  :                    :  +- *(77) BroadcastHashJoin 
[cs_item_sk#886], [i_item_sk#492], Inner, BuildRight
                  :                    :     :- SortMergeJoin [cs_item_sk#886], 
[ss_item_sk#3334], LeftSemi
                  :                    :     :  :- *(41) Sort [cs_item_sk#886 
ASC NULLS FIRST], false, 0
                  :                    :     :  :  +- Exchange 
hashpartitioning(cs_item_sk#886, 4), true, [id=#15142]
                  :                    :     :  :     +- *(40) Project 
[cs_item_sk#886, cs_quantity#889, cs_list_price#891, cs_sold_date_sk#905]
                  :                    :     :  :        +- *(40) Filter 
isnotnull(cs_item_sk#886)
                  :                    :     :  :           +- *(40) 
ColumnarToRow
                  :                    :     :  :              +- FileScan 
parquet [cs_item_sk#886,cs_quantity#889,cs_list_price#891,cs_sold_date_sk#905] 
Batched: true, DataFilters: [isnotnull(cs_item_sk#886)], Format: Parquet, 
Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/catalog_sales],
 PartitionFilters: [isnotnull(cs_sold_date_sk#905), 
dynamicpruningexpression(cs_sold_date_sk#905 IN dynamicpruning#3..., 
PushedFilters: [IsNotNull(cs_item_sk)], ReadSchema: 
struct<cs_item_sk:int,cs_quantity:int,cs_list_price:decimal(7,2)>
                  :                    :     :  :                    +- 
ReusedSubquery SubqueryBroadcast dynamicpruning#3445, 0, [d_date_sk#329], 
[id=#14839]
                  :                    :     :  +- *(57) Sort [ss_item_sk#3334 
ASC NULLS FIRST], false, 0
                  :                    :     :     +- ReusedExchange 
[ss_item_sk#3334], Exchange hashpartitioning(ss_item_sk#3334, 4), true, 
[id=#13278]
                  :                    :     +- ReusedExchange [i_item_sk#492, 
i_brand_id#499, i_class_id#501, i_category_id#503], BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
[id=#13552]
                  :                    +- ReusedExchange [d_date_sk#329], 
BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint))), [id=#12547]
                  +- *(117) Project [sales#3328, number_sales#3329L, 
channel#3327, i_brand_id#499, i_class_id#501, i_category_id#503]
                     +- *(117) Filter 
(isnotnull(sum(CheckOverflow((promote_precision(cast(cast(ws_quantity#1030 as 
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(ws_list_price#1032 
as decimal(12,2)))), DecimalType(18,2), true))#3359) AND 
(cast(sum(CheckOverflow((promote_precision(cast(cast(ws_quantity#1030 as 
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(ws_list_price#1032 
as decimal(12,2)))), DecimalType(18,2), true))#3359 as decimal(32,6)) > 
cast(ReusedSubquery Subquery scalar-subquery#3320, [id=#10802] as 
decimal(32,6))))
                        :  +- ReusedSubquery Subquery scalar-subquery#3320, 
[id=#10802]
                        +- *(117) HashAggregate(keys=[i_brand_id#499, 
i_class_id#501, i_category_id#503], 
functions=[sum(CheckOverflow((promote_precision(cast(cast(ws_quantity#1030 as 
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(ws_list_price#1032 
as decimal(12,2)))), DecimalType(18,2), true)), count(1)])
                           +- Exchange hashpartitioning(i_brand_id#499, 
i_class_id#501, i_category_id#503, 4), true, [id=#15220]
                              +- *(116) HashAggregate(keys=[i_brand_id#499, 
i_class_id#501, i_category_id#503], 
functions=[partial_sum(CheckOverflow((promote_precision(cast(cast(ws_quantity#1030
 as decimal(10,0)) as decimal(12,2))) * 
promote_precision(cast(ws_list_price#1032 as decimal(12,2)))), 
DecimalType(18,2), true)), partial_count(1)])
                                 +- *(116) Project [ws_quantity#1030, 
ws_list_price#1032, i_brand_id#499, i_class_id#501, i_category_id#503]
                                    +- *(116) BroadcastHashJoin 
[ws_sold_date_sk#1046], [d_date_sk#329], Inner, BuildRight
                                       :- *(116) Project [ws_quantity#1030, 
ws_list_price#1032, ws_sold_date_sk#1046, i_brand_id#499, i_class_id#501, 
i_category_id#503]
                                       :  +- *(116) BroadcastHashJoin 
[ws_item_sk#1015], [i_item_sk#492], Inner, BuildRight
                                       :     :- SortMergeJoin 
[ws_item_sk#1015], [ss_item_sk#3334], LeftSemi
                                       :     :  :- *(80) Sort [ws_item_sk#1015 
ASC NULLS FIRST], false, 0
                                       :     :  :  +- Exchange 
hashpartitioning(ws_item_sk#1015, 4), true, [id=#15208]
                                       :     :  :     +- *(79) Project 
[ws_item_sk#1015, ws_quantity#1030, ws_list_price#1032, ws_sold_date_sk#1046]
                                       :     :  :        +- *(79) Filter 
isnotnull(ws_item_sk#1015)
                                       :     :  :           +- *(79) 
ColumnarToRow
                                       :     :  :              +- FileScan 
parquet 
[ws_item_sk#1015,ws_quantity#1030,ws_list_price#1032,ws_sold_date_sk#1046] 
Batched: true, DataFilters: [isnotnull(ws_item_sk#1015)], Format: Parquet, 
Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/web_sales],
 PartitionFilters: [isnotnull(ws_sold_date_sk#1046), 
dynamicpruningexpression(ws_sold_date_sk#1046 IN dynamicpruning..., 
PushedFilters: [IsNotNull(ws_item_sk)], ReadSchema: 
struct<ws_item_sk:int,ws_quantity:int,ws_list_price:decimal(7,2)>
                                       :     :  :                    +- 
ReusedSubquery SubqueryBroadcast dynamicpruning#3445, 0, [d_date_sk#329], 
[id=#14839]
                                       :     :  +- *(96) Sort [ss_item_sk#3334 
ASC NULLS FIRST], false, 0
                                       :     :     +- ReusedExchange 
[ss_item_sk#3334], Exchange hashpartitioning(ss_item_sk#3334, 4), true, 
[id=#13278]
                                       :     +- ReusedExchange [i_item_sk#492, 
i_brand_id#499, i_class_id#501, i_category_id#503], BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
[id=#13552]
                                       +- ReusedExchange [d_date_sk#329], 
BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint))), [id=#12547]
   ```
   
   - Improves the reuse of exchanges and subqueries by enabling reuse across 
the whole plan. This means that the new combined rule utilizes the reuse 
opportunities between parent and subqueries by traversing the whole plan 
(including subqueries) in a bottom up manner.
   
   - Improves the caching logic by avoiding canonicalization when possible. 
When at least 2 exchanges or subqueries have the same schema only then they are 
canonicalized.
   
   ### Why are the changes needed?
   Performance improvement.
   
   ### How was this patch tested?
   Existing and new UTs, TPCDS benchmarks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to