wangyum opened a new pull request #33522:
URL: https://github.com/apache/spark/pull/33522


   ### What changes were proposed in this pull request?
   
   The expressions in join condition maybe eval three 
times(`ShuffleExchangeExec`, `SortExec` and the join itself). This pr add a new 
rule(`PushDownJoinConditionEvaluation`) to push down join condition evaluation 
to make it only eval once. For example:
   ```sql
   CREATE TABLE t1 using parquet AS select id as a, id as b from range(10);
   CREATE TABLE t2 using parquet AS select id as a, id as b from range(20);
   SELECT t1.* FROM t1 JOIN t2 ON translate(t1.a, '123', 'abc') = 
translate(t2.a, '123', 'abc');
   ```
   Before this pr:
   ```
   == Optimized Logical Plan ==
   Project [a#6L, b#7L]
   +- Join Inner, (translate(cast(a#6L as string), 123, abc) = 
translate(cast(a#8L as string), 123, abc))
      :- Filter isnotnull(a#6L)
      :  +- Relation default.t1[a#6L,b#7L] parquet
      +- Project [a#8L]
         +- Filter isnotnull(a#8L)
            +- Relation default.t2[a#8L,b#9L] parquet
   ```
   After this pr:
   ```
   == Optimized Logical Plan ==
   Project [a#6L, b#7L]
   +- Join Inner, (translate(CAST(spark_catalog.default.t1.a AS STRING), '123', 
'abc')#604 = translate(CAST(spark_catalog.default.t2.a AS STRING), '123', 
'abc')#605)
      :- Project [a#6L, b#7L, translate(cast(a#6L as string), 123, abc) AS 
translate(CAST(spark_catalog.default.t1.a AS STRING), '123', 'abc')#604]
      :  +- Filter isnotnull(a#6L)
      :     +- Relation default.t1[a#6L,b#7L] parquet
      +- Project [translate(cast(a#8L as string), 123, abc) AS 
translate(CAST(spark_catalog.default.t2.a AS STRING), '123', 'abc')#605]
         +- Filter isnotnull(a#8L)
            +- Relation default.t2[a#8L,b#9L] parquet
   ```
   
   ### Why are the changes needed?
   
   Improve query performance.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   
   ### How was this patch tested?
   
   Unit test and benchmark test:
   ```scala
   val numRows = 1024 * 1024 * 15
   spark.sql(s"CREATE TABLE t1 using parquet AS select id as a, id as b from 
range(${numRows}L)")
   spark.sql(s"CREATE TABLE t2 using parquet AS select id as a, id as b from 
range(${numRows}L)")
   val benchmark = new Benchmark("Benchmark push down join condition 
evaluation", numRows, minNumIters = 5)
   
   Seq(false, true).foreach { pushDownEnabled =>
     val name = s"Join Condition Evaluation ${if (pushDownEnabled) 
s"(Pushdown)" else ""}"
     benchmark.addCase(name) { _ =>
       withSQLConf("spark.sql.pushDownJoinConditionEvaluationevaluation" -> 
s"$pushDownEnabled") {
         spark.sql("SELECT t1.* FROM t1 JOIN t2 ON translate(t1.a, '123', 
'abc') = translate(t2.a, '123', 
'abc')").write.format("noop").mode("Overwrite").save()
       }
     }
   }
   benchmark.run()
   ```
   Benchmark result:
   ```
   Java HotSpot(TM) 64-Bit Server VM 1.8.0_251-b08 on Mac OS X 10.15.7
   Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
   Benchmark push down join condition evaluation:  Best Time(ms)   Avg Time(ms) 
  Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   
-----------------------------------------------------------------------------------------------------------------------------
   Join Condition Evaluation                              32459          34521  
      1465          0.5        2063.7       1.0X
   Join Condition Evaluation (Pushdown)                   19483          20350  
       812          0.8        1238.7       1.7X
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to