peter-toth opened a new pull request #28885:
URL: https://github.com/apache/spark/pull/28885
### What changes were proposed in this pull request?
This PR:
- Unifies exchange and subquery reuse into one rule as the current separate
`ReuseExchange` and `ReuseSubquery` rule can interfere. One example of the
issue is the TPCDS Q14a query where the definition of the exchange `id=13278`
is missing but there are 3 reuse reference to it. In the case of Q14a
`ReuseExchange` rule inserted the reuse references correctly but then the
`ReuseSubquery` rule altered the original `id=13278` node to `id=15038` by
inserting `ReusedSubquery` nodes under it. This PR fixes the issue by combining
the 2 similar rules that insert references into the plan:
```
== Physical Plan ==
TakeOrderedAndProject(limit=100, orderBy=[channel#3384 ASC NULLS
FIRST,i_brand_id#3385 ASC NULLS FIRST,i_class_id#3386 ASC NULLS
FIRST,i_category_id#3387 ASC NULLS FIRST],
output=[channel#3384,i_brand_id#3385,i_class_id#3386,i_category_id#3387,sum(sales)#3374,sum(number_sales)#3375L])
+- *(119) HashAggregate(keys=[channel#3384, i_brand_id#3385,
i_class_id#3386, i_category_id#3387, spark_grouping_id#3383L],
functions=[sum(sales#3318), sum(number_sales#3319L)])
+- Exchange hashpartitioning(channel#3384, i_brand_id#3385,
i_class_id#3386, i_category_id#3387, spark_grouping_id#3383L, 4), true,
[id=#15231]
+- *(118) HashAggregate(keys=[channel#3384, i_brand_id#3385,
i_class_id#3386, i_category_id#3387, spark_grouping_id#3383L],
functions=[partial_sum(sales#3318), partial_sum(number_sales#3319L)])
+- *(118) Expand [List(sales#3318, number_sales#3319L,
channel#3317, i_brand_id#499, i_class_id#501, i_category_id#503, 0),
List(sales#3318, number_sales#3319L, channel#3317, i_brand_id#499,
i_class_id#501, null, 1), List(sales#3318, number_sales#3319L, channel#3317,
i_brand_id#499, null, null, 3), List(sales#3318, number_sales#3319L,
channel#3317, null, null, null, 7), List(sales#3318, number_sales#3319L, null,
null, null, null, 15)], [sales#3318, number_sales#3319L, channel#3384,
i_brand_id#3385, i_class_id#3386, i_category_id#3387, spark_grouping_id#3383L]
+- Union
:- *(39) Project [sales#3318, number_sales#3319L,
channel#3317, i_brand_id#499, i_class_id#501, i_category_id#503]
: +- *(39) Filter
(isnotnull(sum(CheckOverflow((promote_precision(cast(cast(ss_quantity#1163 as
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(ss_list_price#1165
as decimal(12,2)))), DecimalType(18,2), true))#3353) AND
(cast(sum(CheckOverflow((promote_precision(cast(cast(ss_quantity#1163 as
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(ss_list_price#1165
as decimal(12,2)))), DecimalType(18,2), true))#3353 as decimal(32,6)) >
cast(Subquery scalar-subquery#3320, [id=#10802] as decimal(32,6))))
: : +- Subquery scalar-subquery#3320, [id=#10802]
: : +- *(8) HashAggregate(keys=[],
functions=[avg(CheckOverflow((promote_precision(cast(cast(quantity#3335 as
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(list_price#3336 as
decimal(12,2)))), DecimalType(18,2), true))])
: : +- Exchange SinglePartition, true, [id=#10798]
: : +- *(7) HashAggregate(keys=[],
functions=[partial_avg(CheckOverflow((promote_precision(cast(cast(quantity#3335
as decimal(10,0)) as decimal(12,2))) * promote_precision(cast(list_price#3336
as decimal(12,2)))), DecimalType(18,2), true))])
: : +- Union
: : :- *(2) Project [ss_quantity#1163 AS
quantity#3335, ss_list_price#1165 AS list_price#3336]
: : : +- *(2) BroadcastHashJoin
[ss_sold_date_sk#1176], [d_date_sk#329], Inner, BuildRight
: : : :- *(2) ColumnarToRow
: : : : +- FileScan parquet
[ss_quantity#1163,ss_list_price#1165,ss_sold_date_sk#1176] Batched: true,
DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/store_sales],
PartitionFilters: [isnotnull(ss_sold_date_sk#1176),
dynamicpruningexpression(ss_sold_date_sk#1176 IN dynamicpruning...,
PushedFilters: [], ReadSchema:
struct<ss_quantity:int,ss_list_price:decimal(7,2)>
: : : : +- SubqueryBroadcast
dynamicpruning#3413, 0, [d_date_sk#329], [id=#10730]
: : : : +- ReusedExchange
[d_date_sk#329], BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))),
[id=#10640]
: : : +- BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))),
[id=#10640]
: : : +- *(1) Project
[d_date_sk#329]
: : : +- *(1) Filter
(((isnotnull(d_year#335) AND (d_year#335 >= 1999)) AND (d_year#335 <= 2001))
AND isnotnull(d_date_sk#329))
: : : +- *(1) ColumnarToRow
: : : +- FileScan parquet
[d_date_sk#329,d_year#335] Batched: true, DataFilters: [isnotnull(d_year#335),
(d_year#335 >= 1999), (d_year#335 <= 2001), isnotnull(d_date_sk#329)], Format:
Parquet, Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/date_dim],
PartitionFilters: [], PushedFilters: [IsNotNull(d_year),
GreaterThanOrEqual(d_year,1999), LessThanOrEqual(d_year,2001),
IsNotNull(d_da..., ReadSchema: struct<d_date_sk:int,d_year:int>
: : :- *(4) Project [cs_quantity#889 AS
quantity#3337, cs_list_price#891 AS list_price#3338]
: : : +- *(4) BroadcastHashJoin
[cs_sold_date_sk#905], [d_date_sk#329], Inner, BuildRight
: : : :- *(4) ColumnarToRow
: : : : +- FileScan parquet
[cs_quantity#889,cs_list_price#891,cs_sold_date_sk#905] Batched: true,
DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/catalog_sales],
PartitionFilters: [isnotnull(cs_sold_date_sk#905),
dynamicpruningexpression(cs_sold_date_sk#905 IN dynamicpruning#3...,
PushedFilters: [], ReadSchema:
struct<cs_quantity:int,cs_list_price:decimal(7,2)>
: : : : +- ReusedSubquery
SubqueryBroadcast dynamicpruning#3413, 0, [d_date_sk#329], [id=#10730]
: : : +- ReusedExchange
[d_date_sk#329], BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))),
[id=#10640]
: : +- *(6) Project [ws_quantity#1030 AS
quantity#3339, ws_list_price#1032 AS list_price#3340]
: : +- *(6) BroadcastHashJoin
[ws_sold_date_sk#1046], [d_date_sk#329], Inner, BuildRight
: : :- *(6) ColumnarToRow
: : : +- FileScan parquet
[ws_quantity#1030,ws_list_price#1032,ws_sold_date_sk#1046] Batched: true,
DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/web_sales],
PartitionFilters: [isnotnull(ws_sold_date_sk#1046),
dynamicpruningexpression(ws_sold_date_sk#1046 IN dynamicpruning...,
PushedFilters: [], ReadSchema:
struct<ws_quantity:int,ws_list_price:decimal(7,2)>
: : : +- ReusedSubquery
SubqueryBroadcast dynamicpruning#3413, 0, [d_date_sk#329], [id=#10730]
: : +- ReusedExchange
[d_date_sk#329], BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))),
[id=#10640]
: +- *(39) HashAggregate(keys=[i_brand_id#499,
i_class_id#501, i_category_id#503],
functions=[sum(CheckOverflow((promote_precision(cast(cast(ss_quantity#1163 as
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(ss_list_price#1165
as decimal(12,2)))), DecimalType(18,2), true)), count(1)])
: +- Exchange hashpartitioning(i_brand_id#499,
i_class_id#501, i_category_id#503, 4), true, [id=#15050]
: +- *(38) HashAggregate(keys=[i_brand_id#499,
i_class_id#501, i_category_id#503],
functions=[partial_sum(CheckOverflow((promote_precision(cast(cast(ss_quantity#1163
as decimal(10,0)) as decimal(12,2))) *
promote_precision(cast(ss_list_price#1165 as decimal(12,2)))),
DecimalType(18,2), true)), partial_count(1)])
: +- *(38) Project [ss_quantity#1163,
ss_list_price#1165, i_brand_id#499, i_class_id#501, i_category_id#503]
: +- *(38) BroadcastHashJoin
[ss_sold_date_sk#1176], [d_date_sk#329], Inner, BuildRight
: :- *(38) Project [ss_quantity#1163,
ss_list_price#1165, ss_sold_date_sk#1176, i_brand_id#499, i_class_id#501,
i_category_id#503]
: : +- *(38) BroadcastHashJoin
[ss_item_sk#1155], [i_item_sk#492], Inner, BuildRight
: : :- SortMergeJoin
[ss_item_sk#1155], [ss_item_sk#3334], LeftSemi
: : : :- *(2) Sort [ss_item_sk#1155
ASC NULLS FIRST], false, 0
: : : : +- Exchange
hashpartitioning(ss_item_sk#1155, 4), true, [id=#14846]
: : : : +- *(1) Project
[ss_item_sk#1155, ss_quantity#1163, ss_list_price#1165, ss_sold_date_sk#1176]
: : : : +- *(1) Filter
isnotnull(ss_item_sk#1155)
: : : : +- *(1)
ColumnarToRow
: : : : +- FileScan
parquet
[ss_item_sk#1155,ss_quantity#1163,ss_list_price#1165,ss_sold_date_sk#1176]
Batched: true, DataFilters: [isnotnull(ss_item_sk#1155)], Format: Parquet,
Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/store_sales],
PartitionFilters: [isnotnull(ss_sold_date_sk#1176),
dynamicpruningexpression(ss_sold_date_sk#1176 IN dynamicpruning...,
PushedFilters: [IsNotNull(ss_item_sk)], ReadSchema:
struct<ss_item_sk:int,ss_quantity:int,ss_list_price:decimal(7,2)>
: : : : +-
SubqueryBroadcast dynamicpruning#3445, 0, [d_date_sk#329], [id=#14839]
: : : : +-
ReusedExchange [d_date_sk#329], BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))),
[id=#12547]
: : : +- *(18) Sort [ss_item_sk#3334
ASC NULLS FIRST], false, 0
: : : +- Exchange
hashpartitioning(ss_item_sk#3334, 4), true, [id=#15038]
: : : +- *(17) Project
[i_item_sk#492 AS ss_item_sk#3334]
: : : +- *(17)
BroadcastHashJoin [i_brand_id#499, i_class_id#501, i_category_id#503],
[brand_id#3331, class_id#3332, category_id#3333], Inner, BuildLeft
: : : :-
BroadcastExchange HashedRelationBroadcastMode(List(input[1, int, true],
input[2, int, true], input[3, int, true])), [id=#12321]
: : : : +- *(3) Project
[i_item_sk#492, i_brand_id#499, i_class_id#501, i_category_id#503]
: : : : +- *(3)
Filter ((isnotnull(i_brand_id#499) AND isnotnull(i_category_id#503)) AND
isnotnull(i_class_id#501))
: : : : +- *(3)
ColumnarToRow
: : : : +-
FileScan parquet
[i_item_sk#492,i_brand_id#499,i_class_id#501,i_category_id#503] Batched: true,
DataFilters: [isnotnull(i_brand_id#499), isnotnull(i_category_id#503),
isnotnull(i_class_id#501)], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/item],
PartitionFilters: [], PushedFilters: [IsNotNull(i_brand_id),
IsNotNull(i_category_id), IsNotNull(i_class_id)], ReadSchema:
struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>
: : : +- *(17)
HashAggregate(keys=[brand_id#3331, class_id#3332, category_id#3333],
functions=[])
: : : +- *(17)
HashAggregate(keys=[brand_id#3331, class_id#3332, category_id#3333],
functions=[])
: : : +- *(17)
HashAggregate(keys=[brand_id#3331, class_id#3332, category_id#3333],
functions=[])
: : : +-
Exchange hashpartitioning(brand_id#3331, class_id#3332, category_id#3333, 4),
true, [id=#15030]
: : : +-
*(16) HashAggregate(keys=[brand_id#3331, class_id#3332, category_id#3333],
functions=[])
: : : +-
SortMergeJoin [coalesce(brand_id#3331, 0), isnull(brand_id#3331),
coalesce(class_id#3332, 0), isnull(class_id#3332), coalesce(category_id#3333,
0), isnull(category_id#3333)], [coalesce(i_brand_id#499, 0),
isnull(i_brand_id#499), coalesce(i_class_id#501, 0), isnull(i_class_id#501),
coalesce(i_category_id#503, 0), isnull(i_category_id#503)], LeftSemi
: : :
:- SortMergeJoin [coalesce(brand_id#3331, 0), isnull(brand_id#3331),
coalesce(class_id#3332, 0), isnull(class_id#3332), coalesce(category_id#3333,
0), isnull(category_id#3333)], [coalesce(i_brand_id#499, 0),
isnull(i_brand_id#499), coalesce(i_class_id#501, 0), isnull(i_class_id#501),
coalesce(i_category_id#503, 0), isnull(i_category_id#503)], LeftSemi
: : : :
:- *(7) Sort [coalesce(brand_id#3331, 0) ASC NULLS FIRST,
isnull(brand_id#3331) ASC NULLS FIRST, coalesce(class_id#3332, 0) ASC NULLS
FIRST, isnull(class_id#3332) ASC NULLS FIRST, coalesce(category_id#3333, 0) ASC
NULLS FIRST, isnull(category_id#3333) ASC NULLS FIRST], false, 0
: : : :
: +- Exchange hashpartitioning(coalesce(brand_id#3331, 0),
isnull(brand_id#3331), coalesce(class_id#3332, 0), isnull(class_id#3332),
coalesce(category_id#3333, 0), isnull(category_id#3333), 4), true, [id=#14862]
: : : :
: +- *(6) Project [i_brand_id#499 AS brand_id#3331, i_class_id#501 AS
class_id#3332, i_category_id#503 AS category_id#3333]
: : : :
: +- *(6) BroadcastHashJoin [ss_sold_date_sk#1176], [d_date_sk#329],
Inner, BuildRight
: : : :
: :- *(6) Project [ss_sold_date_sk#1176, i_brand_id#499,
i_class_id#501, i_category_id#503]
: : : :
: : +- *(6) BroadcastHashJoin [ss_item_sk#1155], [i_item_sk#492],
Inner, BuildRight
: : : :
: : :- *(6) Project [ss_item_sk#1155, ss_sold_date_sk#1176]
: : : :
: : : +- *(6) Filter isnotnull(ss_item_sk#1155)
: : : :
: : : +- *(6) ColumnarToRow
: : : :
: : : +- FileScan parquet
[ss_item_sk#1155,ss_sold_date_sk#1176] Batched: true, DataFilters:
[isnotnull(ss_item_sk#1155)], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/store_sales],
PartitionFilters: [isnotnull(ss_sold_date_sk#1176),
dynamicpruningexpression(ss_sold_date_sk#1176 IN dynamicpruning...,
PushedFilters: [IsNotNull(ss_item_sk)], ReadSchema: struct<ss_item_sk:int>
: : : :
: : : +- SubqueryBroadcast dynamicpruning#3416, 0,
[d_date_sk#329], [id=#14851]
: : : :
: : : +- ReusedExchange [d_date_sk#329],
BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as
bigint))), [id=#12341]
: : : :
: : +- BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))),
[id=#12332]
: : : :
: : +- *(4) Project [i_item_sk#492, i_brand_id#499,
i_class_id#501, i_category_id#503]
: : : :
: : +- *(4) Filter (((isnotnull(i_item_sk#492) AND
isnotnull(i_category_id#503)) AND isnotnull(i_class_id#501)) AND
isnotnull(i_brand_id#499))
: : : :
: : +- *(4) ColumnarToRow
: : : :
: : +- FileScan parquet
[i_item_sk#492,i_brand_id#499,i_class_id#501,i_category_id#503] Batched: true,
DataFilters: [isnotnull(i_item_sk#492), isnotnull(i_category_id#503),
isnotnull(i_class_id#501), isnotnull(i_b..., Format: Parquet, Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/item],
PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk),
IsNotNull(i_category_id), IsNotNull(i_class_id), IsNotNull(i_brand_id)],
ReadSchema:
struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>
: : : :
: +- BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))),
[id=#12341]
: : : :
: +- *(5) Project [d_date_sk#329]
: : : :
: +- *(5) Filter (((isnotnull(d_year#335) AND (d_year#335 >=
1999)) AND (d_year#335 <= 2001)) AND isnotnull(d_date_sk#329))
: : : :
: +- *(5) ColumnarToRow
: : : :
: +- FileScan parquet [d_date_sk#329,d_year#335]
Batched: true, DataFilters: [isnotnull(d_year#335), (d_year#335 >= 1999),
(d_year#335 <= 2001), isnotnull(d_date_sk#329)], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/date_dim],
PartitionFilters: [], PushedFilters: [IsNotNull(d_year),
GreaterThanOrEqual(d_year,1999), LessThanOrEqual(d_year,2001),
IsNotNull(d_da..., ReadSchema: struct<d_date_sk:int,d_year:int>
: : : :
+- *(11) Sort [coalesce(i_brand_id#499, 0) ASC NULLS FIRST,
isnull(i_brand_id#499) ASC NULLS FIRST, coalesce(i_class_id#501, 0) ASC NULLS
FIRST, isnull(i_class_id#501) ASC NULLS FIRST, coalesce(i_category_id#503, 0)
ASC NULLS FIRST, isnull(i_category_id#503) ASC NULLS FIRST], false, 0
: : : :
+- Exchange hashpartitioning(coalesce(i_brand_id#499, 0),
isnull(i_brand_id#499), coalesce(i_class_id#501, 0), isnull(i_class_id#501),
coalesce(i_category_id#503, 0), isnull(i_category_id#503), 4), true, [id=#15005]
: : : :
+- *(10) Project [i_brand_id#499, i_class_id#501, i_category_id#503]
: : : :
+- *(10) BroadcastHashJoin [cs_sold_date_sk#905], [d_date_sk#329],
Inner, BuildRight
: : : :
:- *(10) Project [cs_sold_date_sk#905, i_brand_id#499,
i_class_id#501, i_category_id#503]
: : : :
: +- *(10) BroadcastHashJoin [cs_item_sk#886], [i_item_sk#492],
Inner, BuildRight
: : : :
: :- *(10) Project [cs_item_sk#886, cs_sold_date_sk#905]
: : : :
: : +- *(10) Filter isnotnull(cs_item_sk#886)
: : : :
: : +- *(10) ColumnarToRow
: : : :
: : +- FileScan parquet
[cs_item_sk#886,cs_sold_date_sk#905] Batched: true, DataFilters:
[isnotnull(cs_item_sk#886)], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/catalog_sales],
PartitionFilters: [isnotnull(cs_sold_date_sk#905),
dynamicpruningexpression(cs_sold_date_sk#905 IN dynamicpruning#3...,
PushedFilters: [IsNotNull(cs_item_sk)], ReadSchema: struct<cs_item_sk:int>
: : : :
: : +- ReusedSubquery SubqueryBroadcast
dynamicpruning#3416, 0, [d_date_sk#329], [id=#14851]
: : : :
: +- BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))),
[id=#12359]
: : : :
: +- *(8) Project [i_item_sk#492, i_brand_id#499,
i_class_id#501, i_category_id#503]
: : : :
: +- *(8) Filter isnotnull(i_item_sk#492)
: : : :
: +- *(8) ColumnarToRow
: : : :
: +- FileScan parquet
[i_item_sk#492,i_brand_id#499,i_class_id#501,i_category_id#503] Batched: true,
DataFilters: [isnotnull(i_item_sk#492)], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/item],
PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema:
struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>
: : : :
+- ReusedExchange [d_date_sk#329], BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))),
[id=#12341]
: : :
+- *(15) Sort [coalesce(i_brand_id#499, 0) ASC NULLS FIRST,
isnull(i_brand_id#499) ASC NULLS FIRST, coalesce(i_class_id#501, 0) ASC NULLS
FIRST, isnull(i_class_id#501) ASC NULLS FIRST, coalesce(i_category_id#503, 0)
ASC NULLS FIRST, isnull(i_category_id#503) ASC NULLS FIRST], false, 0
: : :
+- Exchange hashpartitioning(coalesce(i_brand_id#499, 0),
isnull(i_brand_id#499), coalesce(i_class_id#501, 0), isnull(i_class_id#501),
coalesce(i_category_id#503, 0), isnull(i_category_id#503), 4), true, [id=#15022]
: : :
+- *(14) Project [i_brand_id#499, i_class_id#501, i_category_id#503]
: : :
+- *(14) BroadcastHashJoin [ws_sold_date_sk#1046], [d_date_sk#329],
Inner, BuildRight
: : :
:- *(14) Project [ws_sold_date_sk#1046, i_brand_id#499,
i_class_id#501, i_category_id#503]
: : :
: +- *(14) BroadcastHashJoin [ws_item_sk#1015], [i_item_sk#492],
Inner, BuildRight
: : :
: :- *(14) Project [ws_item_sk#1015, ws_sold_date_sk#1046]
: : :
: : +- *(14) Filter isnotnull(ws_item_sk#1015)
: : :
: : +- *(14) ColumnarToRow
: : :
: : +- FileScan parquet
[ws_item_sk#1015,ws_sold_date_sk#1046] Batched: true, DataFilters:
[isnotnull(ws_item_sk#1015)], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/web_sales],
PartitionFilters: [isnotnull(ws_sold_date_sk#1046),
dynamicpruningexpression(ws_sold_date_sk#1046 IN dynamicpruning...,
PushedFilters: [IsNotNull(ws_item_sk)], ReadSchema: struct<ws_item_sk:int>
: : :
: : +- ReusedSubquery SubqueryBroadcast
dynamicpruning#3416, 0, [d_date_sk#329], [id=#14851]
: : :
: +- ReusedExchange [i_item_sk#492, i_brand_id#499,
i_class_id#501, i_category_id#503], BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))),
[id=#12359]
: : :
+- ReusedExchange [d_date_sk#329], BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))),
[id=#12341]
: : +- BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))),
[id=#13552]
: : +- SortMergeJoin
[i_item_sk#492], [ss_item_sk#3334], LeftSemi
: : :- *(20) Sort [i_item_sk#492
ASC NULLS FIRST], false, 0
: : : +- Exchange
hashpartitioning(i_item_sk#492, 4), true, [id=#12428]
: : : +- *(19) Project
[i_item_sk#492, i_brand_id#499, i_class_id#501, i_category_id#503]
: : : +- *(19) Filter
isnotnull(i_item_sk#492)
: : : +- *(19)
ColumnarToRow
: : : +- FileScan
parquet [i_item_sk#492,i_brand_id#499,i_class_id#501,i_category_id#503]
Batched: true, DataFilters: [isnotnull(i_item_sk#492)], Format: Parquet,
Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/item],
PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema:
struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>
: : +- *(36) Sort
[ss_item_sk#3334 ASC NULLS FIRST], false, 0
: : +- ReusedExchange
[ss_item_sk#3334], Exchange hashpartitioning(ss_item_sk#3334, 4), true,
[id=#13278]
: +- BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))),
[id=#12547]
: +- *(37) Project [d_date_sk#329]
: +- *(37) Filter
((((isnotnull(d_year#335) AND isnotnull(d_moy#337)) AND (d_year#335 = 2001))
AND (d_moy#337 = 11)) AND isnotnull(d_date_sk#329))
: +- *(37) ColumnarToRow
: +- FileScan parquet
[d_date_sk#329,d_year#335,d_moy#337] Batched: true, DataFilters:
[isnotnull(d_year#335), isnotnull(d_moy#337), (d_year#335 = 2001), (d_moy#337 =
11), isnotnull(d_..., Format: Parquet, Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/date_dim],
PartitionFilters: [], PushedFilters: [IsNotNull(d_year), IsNotNull(d_moy),
EqualTo(d_year,2001), EqualTo(d_moy,11), IsNotNull(d_date_sk)], ReadSchema:
struct<d_date_sk:int,d_year:int,d_moy:int>
:- *(78) Project [sales#3323, number_sales#3324L,
channel#3322, i_brand_id#499, i_class_id#501, i_category_id#503]
: +- *(78) Filter
(isnotnull(sum(CheckOverflow((promote_precision(cast(cast(cs_quantity#889 as
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(cs_list_price#891 as
decimal(12,2)))), DecimalType(18,2), true))#3356) AND
(cast(sum(CheckOverflow((promote_precision(cast(cast(cs_quantity#889 as
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(cs_list_price#891 as
decimal(12,2)))), DecimalType(18,2), true))#3356 as decimal(32,6)) >
cast(ReusedSubquery Subquery scalar-subquery#3320, [id=#10802] as
decimal(32,6))))
: : +- ReusedSubquery Subquery scalar-subquery#3320,
[id=#10802]
: +- *(78) HashAggregate(keys=[i_brand_id#499,
i_class_id#501, i_category_id#503],
functions=[sum(CheckOverflow((promote_precision(cast(cast(cs_quantity#889 as
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(cs_list_price#891 as
decimal(12,2)))), DecimalType(18,2), true)), count(1)])
: +- Exchange hashpartitioning(i_brand_id#499,
i_class_id#501, i_category_id#503, 4), true, [id=#15154]
: +- *(77) HashAggregate(keys=[i_brand_id#499,
i_class_id#501, i_category_id#503],
functions=[partial_sum(CheckOverflow((promote_precision(cast(cast(cs_quantity#889
as decimal(10,0)) as decimal(12,2))) *
promote_precision(cast(cs_list_price#891 as decimal(12,2)))),
DecimalType(18,2), true)), partial_count(1)])
: +- *(77) Project [cs_quantity#889,
cs_list_price#891, i_brand_id#499, i_class_id#501, i_category_id#503]
: +- *(77) BroadcastHashJoin
[cs_sold_date_sk#905], [d_date_sk#329], Inner, BuildRight
: :- *(77) Project [cs_quantity#889,
cs_list_price#891, cs_sold_date_sk#905, i_brand_id#499, i_class_id#501,
i_category_id#503]
: : +- *(77) BroadcastHashJoin
[cs_item_sk#886], [i_item_sk#492], Inner, BuildRight
: : :- SortMergeJoin [cs_item_sk#886],
[ss_item_sk#3334], LeftSemi
: : : :- *(41) Sort [cs_item_sk#886
ASC NULLS FIRST], false, 0
: : : : +- Exchange
hashpartitioning(cs_item_sk#886, 4), true, [id=#15142]
: : : : +- *(40) Project
[cs_item_sk#886, cs_quantity#889, cs_list_price#891, cs_sold_date_sk#905]
: : : : +- *(40) Filter
isnotnull(cs_item_sk#886)
: : : : +- *(40)
ColumnarToRow
: : : : +- FileScan
parquet [cs_item_sk#886,cs_quantity#889,cs_list_price#891,cs_sold_date_sk#905]
Batched: true, DataFilters: [isnotnull(cs_item_sk#886)], Format: Parquet,
Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/catalog_sales],
PartitionFilters: [isnotnull(cs_sold_date_sk#905),
dynamicpruningexpression(cs_sold_date_sk#905 IN dynamicpruning#3...,
PushedFilters: [IsNotNull(cs_item_sk)], ReadSchema:
struct<cs_item_sk:int,cs_quantity:int,cs_list_price:decimal(7,2)>
: : : : +-
ReusedSubquery SubqueryBroadcast dynamicpruning#3445, 0, [d_date_sk#329],
[id=#14839]
: : : +- *(57) Sort [ss_item_sk#3334
ASC NULLS FIRST], false, 0
: : : +- ReusedExchange
[ss_item_sk#3334], Exchange hashpartitioning(ss_item_sk#3334, 4), true,
[id=#13278]
: : +- ReusedExchange [i_item_sk#492,
i_brand_id#499, i_class_id#501, i_category_id#503], BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))),
[id=#13552]
: +- ReusedExchange [d_date_sk#329],
BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as
bigint))), [id=#12547]
+- *(117) Project [sales#3328, number_sales#3329L,
channel#3327, i_brand_id#499, i_class_id#501, i_category_id#503]
+- *(117) Filter
(isnotnull(sum(CheckOverflow((promote_precision(cast(cast(ws_quantity#1030 as
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(ws_list_price#1032
as decimal(12,2)))), DecimalType(18,2), true))#3359) AND
(cast(sum(CheckOverflow((promote_precision(cast(cast(ws_quantity#1030 as
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(ws_list_price#1032
as decimal(12,2)))), DecimalType(18,2), true))#3359 as decimal(32,6)) >
cast(ReusedSubquery Subquery scalar-subquery#3320, [id=#10802] as
decimal(32,6))))
: +- ReusedSubquery Subquery scalar-subquery#3320,
[id=#10802]
+- *(117) HashAggregate(keys=[i_brand_id#499,
i_class_id#501, i_category_id#503],
functions=[sum(CheckOverflow((promote_precision(cast(cast(ws_quantity#1030 as
decimal(10,0)) as decimal(12,2))) * promote_precision(cast(ws_list_price#1032
as decimal(12,2)))), DecimalType(18,2), true)), count(1)])
+- Exchange hashpartitioning(i_brand_id#499,
i_class_id#501, i_category_id#503, 4), true, [id=#15220]
+- *(116) HashAggregate(keys=[i_brand_id#499,
i_class_id#501, i_category_id#503],
functions=[partial_sum(CheckOverflow((promote_precision(cast(cast(ws_quantity#1030
as decimal(10,0)) as decimal(12,2))) *
promote_precision(cast(ws_list_price#1032 as decimal(12,2)))),
DecimalType(18,2), true)), partial_count(1)])
+- *(116) Project [ws_quantity#1030,
ws_list_price#1032, i_brand_id#499, i_class_id#501, i_category_id#503]
+- *(116) BroadcastHashJoin
[ws_sold_date_sk#1046], [d_date_sk#329], Inner, BuildRight
:- *(116) Project [ws_quantity#1030,
ws_list_price#1032, ws_sold_date_sk#1046, i_brand_id#499, i_class_id#501,
i_category_id#503]
: +- *(116) BroadcastHashJoin
[ws_item_sk#1015], [i_item_sk#492], Inner, BuildRight
: :- SortMergeJoin
[ws_item_sk#1015], [ss_item_sk#3334], LeftSemi
: : :- *(80) Sort [ws_item_sk#1015
ASC NULLS FIRST], false, 0
: : : +- Exchange
hashpartitioning(ws_item_sk#1015, 4), true, [id=#15208]
: : : +- *(79) Project
[ws_item_sk#1015, ws_quantity#1030, ws_list_price#1032, ws_sold_date_sk#1046]
: : : +- *(79) Filter
isnotnull(ws_item_sk#1015)
: : : +- *(79)
ColumnarToRow
: : : +- FileScan
parquet
[ws_item_sk#1015,ws_quantity#1030,ws_list_price#1032,ws_sold_date_sk#1046]
Batched: true, DataFilters: [isnotnull(ws_item_sk#1015)], Format: Parquet,
Location:
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark-sql-perf/data5/web_sales],
PartitionFilters: [isnotnull(ws_sold_date_sk#1046),
dynamicpruningexpression(ws_sold_date_sk#1046 IN dynamicpruning...,
PushedFilters: [IsNotNull(ws_item_sk)], ReadSchema:
struct<ws_item_sk:int,ws_quantity:int,ws_list_price:decimal(7,2)>
: : : +-
ReusedSubquery SubqueryBroadcast dynamicpruning#3445, 0, [d_date_sk#329],
[id=#14839]
: : +- *(96) Sort [ss_item_sk#3334
ASC NULLS FIRST], false, 0
: : +- ReusedExchange
[ss_item_sk#3334], Exchange hashpartitioning(ss_item_sk#3334, 4), true,
[id=#13278]
: +- ReusedExchange [i_item_sk#492,
i_brand_id#499, i_class_id#501, i_category_id#503], BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))),
[id=#13552]
+- ReusedExchange [d_date_sk#329],
BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as
bigint))), [id=#12547]
```
- Improves the reuse of exchanges and subqueries by enabling reuse across
the whole plan. This means that the new combined rule utilizes the reuse
opportunities between parent and subqueries by traversing the whole plan
(including subqueries) in a bottom up manner.
- Improves the caching logic by avoiding canonicalization when possible.
When at least 2 exchanges or subqueries have the same schema only then they are
canonicalized.
### Why are the changes needed?
Performance improvement.
### How was this patch tested?
Existing and new UTs, TPCDS benchmarks.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]