wangyum opened a new pull request #33522:
URL: https://github.com/apache/spark/pull/33522
### What changes were proposed in this pull request?
The expressions in join condition maybe eval three
times(`ShuffleExchangeExec`, `SortExec` and the join itself). This pr add a new
rule(`PushDownJoinConditionEvaluation`) to push down join condition evaluation
to make it only eval once. For example:
```sql
CREATE TABLE t1 using parquet AS select id as a, id as b from range(10);
CREATE TABLE t2 using parquet AS select id as a, id as b from range(20);
SELECT t1.* FROM t1 JOIN t2 ON translate(t1.a, '123', 'abc') =
translate(t2.a, '123', 'abc');
```
Before this pr:
```
== Optimized Logical Plan ==
Project [a#6L, b#7L]
+- Join Inner, (translate(cast(a#6L as string), 123, abc) =
translate(cast(a#8L as string), 123, abc))
:- Filter isnotnull(a#6L)
: +- Relation default.t1[a#6L,b#7L] parquet
+- Project [a#8L]
+- Filter isnotnull(a#8L)
+- Relation default.t2[a#8L,b#9L] parquet
```
After this pr:
```
== Optimized Logical Plan ==
Project [a#6L, b#7L]
+- Join Inner, (translate(CAST(spark_catalog.default.t1.a AS STRING), '123',
'abc')#604 = translate(CAST(spark_catalog.default.t2.a AS STRING), '123',
'abc')#605)
:- Project [a#6L, b#7L, translate(cast(a#6L as string), 123, abc) AS
translate(CAST(spark_catalog.default.t1.a AS STRING), '123', 'abc')#604]
: +- Filter isnotnull(a#6L)
: +- Relation default.t1[a#6L,b#7L] parquet
+- Project [translate(cast(a#8L as string), 123, abc) AS
translate(CAST(spark_catalog.default.t2.a AS STRING), '123', 'abc')#605]
+- Filter isnotnull(a#8L)
+- Relation default.t2[a#8L,b#9L] parquet
```
### Why are the changes needed?
Improve query performance.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test and benchmark test:
```scala
val numRows = 1024 * 1024 * 15
spark.sql(s"CREATE TABLE t1 using parquet AS select id as a, id as b from
range(${numRows}L)")
spark.sql(s"CREATE TABLE t2 using parquet AS select id as a, id as b from
range(${numRows}L)")
val benchmark = new Benchmark("Benchmark push down join condition
evaluation", numRows, minNumIters = 5)
Seq(false, true).foreach { pushDownEnabled =>
val name = s"Join Condition Evaluation ${if (pushDownEnabled)
s"(Pushdown)" else ""}"
benchmark.addCase(name) { _ =>
withSQLConf("spark.sql.pushDownJoinConditionEvaluationevaluation" ->
s"$pushDownEnabled") {
spark.sql("SELECT t1.* FROM t1 JOIN t2 ON translate(t1.a, '123',
'abc') = translate(t2.a, '123',
'abc')").write.format("noop").mode("Overwrite").save()
}
}
}
benchmark.run()
```
Benchmark result:
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_251-b08 on Mac OS X 10.15.7
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
Benchmark push down join condition evaluation: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
Join Condition Evaluation 32459 34521
1465 0.5 2063.7 1.0X
Join Condition Evaluation (Pushdown) 19483 20350
812 0.8 1238.7 1.7X
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]