mcdull-zhang opened a new pull request #35967:
URL: https://github.com/apache/spark/pull/35967


   This is a backport of #35878  to branch 3.2.
   
   ### What changes were proposed in this pull request?
   The return value of Literal.references is an empty AttributeSet, so Literal 
is mistaken for a partition column.
   
   For example, the sql in the test case will generate such a physical plan 
when the adaptive is closed:
   ```tex
   *(4) Project [store_id#5281, date_id#5283, state_province#5292]
   +- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, 
BuildRight, false
      :- Union
      :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
      :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 
1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
      :  :     :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, 
[store_id#5291], [id=#336]
      :  :     +- *(1) ColumnarToRow
      :  :        +- FileScan parquet 
default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: 
[isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], 
PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], 
ReadSchema: struct<date_id:int>
      :  :              +- SubqueryBroadcast dynamicpruning#5300, 0, 
[store_id#5291], [id=#336]
      :  :                 +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), 
[id=#335]
      :  :                    +- *(1) Project [store_id#5291, 
state_province#5292]
      :  :                       +- *(1) Filter (((isnotnull(country#5293) AND 
(country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND 
isnotnull(store_id#5291))
      :  :                          +- *(1) ColumnarToRow
      :  :                             +- FileScan parquet 
default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: 
true, DataFilters: [isnotnull(country#5293), (country#5293 = US), 
((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, 
Location: InMemoryFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache....,
 PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), 
Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: 
struct<store_id:int,state_province:string,country:string>
      :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
      :     +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 
1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
      :        :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, 
[store_id#5291], [id=#336]
      :        +- *(2) ColumnarToRow
      :           +- FileScan parquet 
default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: 
[isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], 
PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: 
struct<date_id:int>
      :                 +- ReusedSubquery SubqueryBroadcast 
dynamicpruning#5300, 0, [store_id#5291], [id=#336]
      +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), 
[id=#335]
   ```
   after this pr:
   ```tex
   *(4) Project [store_id#5281, date_id#5283, state_province#5292]
   +- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, 
BuildRight, false
      :- Union
      :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
      :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
      :  :     +- *(1) ColumnarToRow
      :  :        +- FileScan parquet 
default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: 
[isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [], PushedFilters: [IsNotNull(date_id), 
GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
      :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
      :     +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
      :        +- *(2) ColumnarToRow
      :           +- FileScan parquet 
default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: 
[isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [], PushedFilters: [IsNotNull(date_id), 
LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
      +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
true] as bigint)),false), [id=#326]
         +- *(3) Project [store_id#5291, state_province#5292]
            +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) 
AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND 
isnotnull(store_id#5291))
               +- *(3) ColumnarToRow
                  +- FileScan parquet 
default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: 
true, DataFilters: [isnotnull(country#5293), (country#5293 = US), 
((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, 
Location: InMemoryFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache....,
 PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), 
Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: 
struct<store_id:int,state_province:string,country:string>
   ```
   
   ### Why are the changes needed?
   Execution performance improvement
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Added unit test
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to