This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2e73d8291a3 [SPARK-39172][SQL] Remove left/right outer join if only left/right side columns are selected and the join keys on the other side are unique 2e73d8291a3 is described below commit 2e73d8291a32636942564450ff37bd0e33dea941 Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Tue May 17 01:07:45 2022 +0800 [SPARK-39172][SQL] Remove left/right outer join if only left/right side columns are selected and the join keys on the other side are unique ### What changes were proposed in this pull request? Improve `EliminateOuterJoin` that support Remove left/right outer join if only left/right side columns are selected and the join keys on the other side are unique. ### Why are the changes needed? Improve the optimzation case using the distinct keys framework. For example: ```sql SELECT t1.* FROM t1 LEFT JOIN (SELECT distinct c1 as c1 FROM t)t2 ON t1.c1 = t2.c1 ==> SELECT t1.* FROM t1 ``` ### Does this PR introduce _any_ user-facing change? no, improve performance ### How was this patch tested? add test Closes #36530 from ulysses-you/unique-outer-join. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/catalyst/dsl/package.scala | 1 + .../spark/sql/catalyst/optimizer/joins.scala | 22 +++++++++- .../optimizer/OuterJoinEliminationSuite.scala | 47 ++++++++++++++++++++++ 3 files changed, 69 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 840a38cc869..05c6bdd5311 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -462,6 +462,7 @@ package object dsl { Window(windowExpressions, partitionSpec, orderSpec, logicalPlan) def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan) + def subquery(alias: String): LogicalPlan = SubqueryAlias(alias, logicalPlan) def except(otherPlan: LogicalPlan, isAll: Boolean): LogicalPlan = Except(logicalPlan, otherPlan, isAll) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index e5e91acf865..796e7d8f89d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -22,7 +22,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction -import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins +import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractFiltersAndInnerJoins} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -139,6 +139,17 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { * SELECT t1.c1, max(t1.c2) FROM t1 GROUP BY t1.c1 * }}} * + * 3. Remove outer join if: + * - For a left outer join with only left-side columns being selected and the right side join + * keys are unique. + * - For a right outer join with only right-side columns being selected and the left side join + * keys are unique. + * + * {{{ + * SELECT t1.* FROM t1 LEFT JOIN (SELECT DISTINCT c1 as c1 FROM t) t2 ON t1.c1 = t2.c1 ==> + * SELECT t1.* FROM t1 + * }}} + * * This rule should be executed before pushing down the Filter */ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { @@ -211,6 +222,15 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if projectList.forall(_.deterministic) && p.references.subsetOf(right.outputSet) && allDuplicateAgnostic(aggExprs) => a.copy(child = p.copy(child = right)) + + case p @ Project(_, ExtractEquiJoinKeys(LeftOuter, _, rightKeys, _, _, left, right, _)) + if right.distinctKeys.exists(_.subsetOf(ExpressionSet(rightKeys))) && + p.references.subsetOf(left.outputSet) => + p.copy(child = left) + case p @ Project(_, ExtractEquiJoinKeys(RightOuter, leftKeys, _, _, _, left, right, _)) + if left.distinctKeys.exists(_.subsetOf(ExpressionSet(leftKeys))) && + p.references.subsetOf(right.outputSet) => + p.copy(child = right) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala index 2530cfded9e..15e4abc13c2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala @@ -268,4 +268,51 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } + + test("SPARK-39172: Remove left/right outer join if only left/right side columns are selected " + + "and the join keys on the other side are unique") { + val x = testRelation.subquery("x") + val y = testRelation1.subquery("y") + comparePlans(Optimize.execute( + x.join(y.groupBy($"d")($"d"), LeftOuter, Some($"a" === $"d")) + .select($"a", $"b", $"c").analyze), + x.select($"a", $"b", $"c").analyze + ) + + comparePlans(Optimize.execute( + x.join(y.groupBy($"d")($"d", count($"d").as("x")), LeftOuter, + Some($"a" === $"d" && $"b" === $"x")) + .select($"a", $"b", $"c").analyze), + x.select($"a", $"b", $"c").analyze + ) + + comparePlans(Optimize.execute( + x.groupBy($"a")($"a").join(y, RightOuter, Some($"a" === $"d")) + .select($"d", $"e", $"f").analyze), + y.select($"d", $"e", $"f").analyze + ) + + comparePlans(Optimize.execute( + x.groupBy($"a")($"a", count($"a").as("x")).join(y, RightOuter, + Some($"a" === $"d" && $"x" === $"e")) + .select($"d", $"e", $"f").analyze), + y.select($"d", $"e", $"f").analyze + ) + + // negative cases + // not a equi-join + val p1 = x.join(y.groupBy($"d")($"d"), LeftOuter, Some($"a" > $"d")) + .select($"a").analyze + comparePlans(Optimize.execute(p1), p1) + + // do not exist unique key + val p2 = x.join(y.groupBy($"d", $"e")($"d", $"e"), LeftOuter, Some($"a" === $"d")) + .select($"a").analyze + comparePlans(Optimize.execute(p2), p2) + + // output comes from the right side of a left outer join + val p3 = x.join(y.groupBy($"d")($"d"), LeftOuter, Some($"a" === $"d")) + .select($"a", $"d").analyze + comparePlans(Optimize.execute(p3), p3) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org