caoxuewen created SPARK-23544: --------------------------------- Summary: Remove repartition operation from join in the optimizer Key: SPARK-23544 URL: https://issues.apache.org/jira/browse/SPARK-23544 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: caoxuewen
Currently, when the children of the join are Repartition or RepartitionByExpression, Repartition operation is not necessary, I think that we can remove the Repartition operation in the Optimizer, and it is safe for the join operation. now the explain looks like: === Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseRepartition === Input LogicalPlan: Join Inner :- Repartition 10, false : +- LocalRelation <empty>, [a#0, b#1] +- Repartition 10, false +- LocalRelation <empty>, [c#2, d#3] Output LogicalPlan: Join Inner :- LocalRelation <empty>, [a#0, b#1] +- LocalRelation <empty>, [c#2, d#3] h3. and I have add a test case: val N = 2 << 20 runJoinBenchmark("sort merge join", N) { val df1 = sparkSession.range(N) .selectExpr(s"(id * 15485863) % ${N*10} as k1") val df2 = sparkSession.range(N) .selectExpr(s"(id * 15485867) % ${N*10} as k2") val df = df1.join(df2.repartition(20), col("k1") === col("k2")) assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined) df.count() } To test the performance of the following: Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1 Intel64 Family 6 Model 94 Stepping 3, GenuineIntel sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ sort merge join Repartition off 3520 / 4364 0.6 1678.5 1.0X sort merge join Repartition on 1946 / 2203 1.1 927.9 1.8X -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org