mcdull-zhang opened a new pull request #35967:
URL: https://github.com/apache/spark/pull/35967
This is a backport of #35878 to branch 3.2.
### What changes were proposed in this pull request?
The return value of Literal.references is an empty AttributeSet, so Literal
is mistaken for a partition column.
For example, the sql in the test case will generate such a physical plan
when the adaptive is closed:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner,
BuildRight, false
:- Union
: :- *(1) Project [4 AS store_id#5281, date_id#5283]
: : +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >=
1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
: : : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0,
[store_id#5291], [id=#336]
: : +- *(1) ColumnarToRow
: : +- FileScan parquet
default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters:
[isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location:
CatalogFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)],
PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)],
ReadSchema: struct<date_id:int>
: : +- SubqueryBroadcast dynamicpruning#5300, 0,
[store_id#5291], [id=#336]
: : +- BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false),
[id=#335]
: : +- *(1) Project [store_id#5291,
state_province#5292]
: : +- *(1) Filter (((isnotnull(country#5293) AND
(country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND
isnotnull(store_id#5291))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet
default.dim_store[store_id#5291,state_province#5292,country#5293] Batched:
true, DataFilters: [isnotnull(country#5293), (country#5293 = US),
((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet,
Location: InMemoryFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache....,
PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US),
Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema:
struct<store_id:int,state_province:string,country:string>
: +- *(2) Project [5 AS store_id#5282, date_id#5287]
: +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <=
1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
: : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0,
[store_id#5291], [id=#336]
: +- *(2) ColumnarToRow
: +- FileScan parquet
default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters:
[isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location:
CatalogFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)],
PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema:
struct<date_id:int>
: +- ReusedSubquery SubqueryBroadcast
dynamicpruning#5300, 0, [store_id#5291], [id=#336]
+- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false),
[id=#335]
```
after this pr:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner,
BuildRight, false
:- Union
: :- *(1) Project [4 AS store_id#5281, date_id#5283]
: : +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet
default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters:
[isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location:
CatalogFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
PartitionFilters: [], PushedFilters: [IsNotNull(date_id),
GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
: +- *(2) Project [5 AS store_id#5282, date_id#5287]
: +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
: +- *(2) ColumnarToRow
: +- FileScan parquet
default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters:
[isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location:
CatalogFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
PartitionFilters: [], PushedFilters: [IsNotNull(date_id),
LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int,
true] as bigint)),false), [id=#326]
+- *(3) Project [store_id#5291, state_province#5292]
+- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US))
AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND
isnotnull(store_id#5291))
+- *(3) ColumnarToRow
+- FileScan parquet
default.dim_store[store_id#5291,state_province#5292,country#5293] Batched:
true, DataFilters: [isnotnull(country#5293), (country#5293 = US),
((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet,
Location: InMemoryFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache....,
PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US),
Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema:
struct<store_id:int,state_province:string,country:string>
```
### Why are the changes needed?
Execution performance improvement
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit test
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]