mcdull-zhang opened a new pull request #35878:
URL: https://github.com/apache/spark/pull/35878
### What changes were proposed in this pull request?
The return value of Literal.references is an empty AttributeSet, so Literal
is mistaken for a partition column.
For example, the sql in the test case will generate such a physical plan
when the adaptive is closed:
```text
*(4) Project [type#5352, joinCol#5354, otherCol#5355, score#5362]
+- *(4) BroadcastHashJoin [type#5352, joinCol#5354], [type#5360,
joinCol#5361], Inner, BuildRight, false
:- Union
: :- *(1) Project [type1 AS type#5352, joinCol#5354, otherCol#5355]
: : +- *(1) Filter (isnotnull(joinCol#5354) AND
dynamicpruningexpression(type1 IN dynamicpruning#5370))
: : : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5370, 0,
[type#5360, joinCol#5361], [id=#381]
: : +- *(1) ColumnarToRow
: : +- FileScan parquet
default.fact1[joinCol#5354,otherCol#5355,partCol#5356] Batched: true,
DataFilters: [isnotnull(joinCol#5354)], Format: Parquet, Location:
InMemoryFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache....,
PartitionFilters: [isnotnull(partCol#5356), (partCol#5356 = part1),
dynamicpruningexpression(type1 IN dynamicprunin..., PushedFilters:
[IsNotNull(joinCol)], ReadSchema: struct<joinCol:int,otherCol:string>
: : +- SubqueryBroadcast dynamicpruning#5370, 0,
[type#5360, joinCol#5361], [id=#381]
: : +- BroadcastExchange
HashedRelationBroadcastMode(List(input[0, string, false], input[1, int,
false]),false), [id=#380]
: : +- *(1) Filter ((((type#5360 <=> type1) OR
(type#5360 <=> type2)) AND isnotnull(type#5360)) AND isnotnull(joinCol#5361))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet
default.dim[type#5360,joinCol#5361,score#5362] Batched: true, DataFilters:
[((type#5360 <=> type1) OR (type#5360 <=> type2)), isnotnull(type#5360),
isnotnull(joinCol#5361)], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache....,
PartitionFilters: [], PushedFilters:
[Or(EqualNullSafe(type,type1),EqualNullSafe(type,type2)), IsNotNull(type),
IsNotNull(joinCol)], ReadSchema: struct<type:string,joinCol:int,score:int>
: +- *(2) Project [type2 AS type#5353, joinCol#5357, otherCol#5358]
: +- *(2) Filter (isnotnull(joinCol#5357) AND
dynamicpruningexpression(type2 IN dynamicpruning#5370))
: : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5370, 0,
[type#5360, joinCol#5361], [id=#381]
: +- *(2) ColumnarToRow
: +- FileScan parquet
default.fact2[joinCol#5357,otherCol#5358,partCol#5359] Batched: true,
DataFilters: [isnotnull(joinCol#5357)], Format: Parquet, Location:
InMemoryFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache....,
PartitionFilters: [isnotnull(partCol#5359), (partCol#5359 = part1),
dynamicpruningexpression(type2 IN dynamicprunin..., PushedFilters:
[IsNotNull(joinCol)], ReadSchema: struct<joinCol:int,otherCol:string>
: +- ReusedSubquery SubqueryBroadcast
dynamicpruning#5370, 0, [type#5360, joinCol#5361], [id=#381]
+- ReusedExchange [type#5360, joinCol#5361, score#5362],
BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false],
input[1, int, false]),false), [id=#380]
```
after this pr:
```text
*(4) Project [type#5352, joinCol#5354, otherCol#5355, score#5362]
+- *(4) BroadcastHashJoin [type#5352, joinCol#5354], [type#5360,
joinCol#5361], Inner, BuildRight, false
:- Union
: :- *(1) Project [type1 AS type#5352, joinCol#5354, otherCol#5355]
: : +- *(1) Filter isnotnull(joinCol#5354)
: : +- *(1) ColumnarToRow
: : +- FileScan parquet
default.fact1[joinCol#5354,otherCol#5355,partCol#5356] Batched: true,
DataFilters: [isnotnull(joinCol#5354)], Format: Parquet, Location:
InMemoryFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache....,
PartitionFilters: [isnotnull(partCol#5356), (partCol#5356 = part1)],
PushedFilters: [IsNotNull(joinCol)], ReadSchema:
struct<joinCol:int,otherCol:string>
: +- *(2) Project [type2 AS type#5353, joinCol#5357, otherCol#5358]
: +- *(2) Filter isnotnull(joinCol#5357)
: +- *(2) ColumnarToRow
: +- FileScan parquet
default.fact2[joinCol#5357,otherCol#5358,partCol#5359] Batched: true,
DataFilters: [isnotnull(joinCol#5357)], Format: Parquet, Location:
InMemoryFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache....,
PartitionFilters: [isnotnull(partCol#5359), (partCol#5359 = part1)],
PushedFilters: [IsNotNull(joinCol)], ReadSchema:
struct<joinCol:int,otherCol:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
false], input[1, int, false]),false), [id=#375]
+- *(3) Filter ((((type#5360 <=> type1) OR (type#5360 <=> type2)) AND
isnotnull(type#5360)) AND isnotnull(joinCol#5361))
+- *(3) ColumnarToRow
+- FileScan parquet
default.dim[type#5360,joinCol#5361,score#5362] Batched: true, DataFilters:
[((type#5360 <=> type1) OR (type#5360 <=> type2)), isnotnull(type#5360),
isnotnull(joinCol#5361)], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache....,
PartitionFilters: [], PushedFilters:
[Or(EqualNullSafe(type,type1),EqualNullSafe(type,type2)), IsNotNull(type),
IsNotNull(joinCol)], ReadSchema: struct<type:string,joinCol:int,score:int>
```
### Why are the changes needed?
Execution performance improvement
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit test
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]