wangyum opened a new pull request, #38069:
URL: https://github.com/apache/spark/pull/38069
### What changes were proposed in this pull request?
This PR makes `PushDownLeftSemiAntiJoin` do not push complex left semi/anti
join condition through project.
### Why are the changes needed?
It will impact performance because the complex expression will evaluate
three times if it is SortMergeJoin. For example:
```sql
CREATE TABLE t1 (item_id BIGINT, event_type STRING, dt STRING) USING parquet
PARTITIONED BY (dt);
CREATE TABLE t2 (item_id BIGINT, cal_dt DATE) using parquet;
set spark.sql.autoBroadcastJoinThreshold=-1;
SELECT t1.item_id, t1.event_type FROM t1 LEFT SEMI JOIN t2 ON t1.item_id =
t2.item_id AND To_date(t1.dt, 'yyyyMMdd') = t2.cal_dt;
```
Before this PR, `cast(gettimestamp(dt#30, yyyyMMdd, TimestampType,
Some(America/Los_Angeles), false) as date)` will evaluate three times:
```
AdaptiveSparkPlan isFinalPlan=false
+- Project [item_id#28L, event_type#29]
+- SortMergeJoin [item_id#28L, cast(gettimestamp(dt#30, yyyyMMdd,
TimestampType, Some(America/Los_Angeles), false) as date)], [item_id#31L,
cal_dt#32], LeftSemi
:- Sort [item_id#28L ASC NULLS FIRST, cast(gettimestamp(dt#30,
yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date) ASC NULLS
FIRST], false, 0
: +- Exchange hashpartitioning(item_id#28L, cast(gettimestamp(dt#30,
yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date), 5),
ENSURE_REQUIREMENTS, [plan_id=110]
: +- Filter isnotnull(item_id#28L)
: +- FileScan parquet
spark_catalog.default.t1[item_id#28L,event_type#29,dt#30]
+- Sort [item_id#31L ASC NULLS FIRST, cal_dt#32 ASC NULLS FIRST],
false, 0
+- Exchange hashpartitioning(item_id#31L, cal_dt#32, 5),
ENSURE_REQUIREMENTS, [plan_id=111]
+- Filter (isnotnull(item_id#31L) AND isnotnull(cal_dt#32))
+- FileScan parquet
spark_catalog.default.t2[item_id#31L,cal_dt#32]
```
The task stack trace:
```
[email protected]/java.text.DecimalFormat.parse(DecimalFormat.java:2149)
[email protected]/java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1935)
[email protected]/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1545)
[email protected]/java.text.DateFormat.parse(DateFormat.java:397)
app//org.apache.spark.sql.catalyst.util.LegacySimpleTimestampFormatter.parse(TimestampFormatter.scala:237)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering.Cast_0$(Unknown
Source)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering.compare(Unknown
Source)
app//org.apache.spark.sql.catalyst.expressions.BaseOrdering.compare(ordering.scala:29)
...
```
After this PR:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [item_id#28L, event_type#29]
+- SortMergeJoin [item_id#28L, new_dt#37], [item_id#31L, cal_dt#32],
LeftSemi
:- Sort [item_id#28L ASC NULLS FIRST, new_dt#37 ASC NULLS FIRST],
false, 0
: +- Exchange hashpartitioning(item_id#28L, new_dt#37, 5),
ENSURE_REQUIREMENTS, [plan_id=110]
: +- Project [item_id#28L, event_type#29, cast(gettimestamp(dt#30,
yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date) AS
new_dt#37]
: +- Filter isnotnull(item_id#28L)
: +- FileScan parquet
spark_catalog.default.t1[item_id#28L,event_type#29,dt#30]
+- Sort [item_id#31L ASC NULLS FIRST, cal_dt#32 ASC NULLS FIRST],
false, 0
+- Exchange hashpartitioning(item_id#31L, cal_dt#32, 5),
ENSURE_REQUIREMENTS, [plan_id=111]
+- Filter (isnotnull(item_id#31L) AND isnotnull(cal_dt#32))
+- FileScan parquet
spark_catalog.default.t2[item_id#31L,cal_dt#32]
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]