This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e73d8291a3 [SPARK-39172][SQL] Remove left/right outer join if only 
left/right side columns are selected and the join keys on the other side are 
unique
2e73d8291a3 is described below

commit 2e73d8291a32636942564450ff37bd0e33dea941
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Tue May 17 01:07:45 2022 +0800

    [SPARK-39172][SQL] Remove left/right outer join if only left/right side 
columns are selected and the join keys on the other side are unique
    
    ### What changes were proposed in this pull request?
    
    Improve `EliminateOuterJoin` that support Remove left/right outer join if 
only left/right side columns are selected and the join keys on the other side 
are unique.
    
    ### Why are the changes needed?
    
    Improve the optimzation case using the distinct keys framework.
    
    For example:
    ```sql
    SELECT t1.* FROM t1 LEFT JOIN (SELECT distinct c1 as c1 FROM t)t2 ON t1.c1 
= t2.c1
    ==>
    SELECT t1.* FROM t1
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    no, improve performance
    
    ### How was this patch tested?
    
    add test
    
    Closes #36530 from ulysses-you/unique-outer-join.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/catalyst/dsl/package.scala    |  1 +
 .../spark/sql/catalyst/optimizer/joins.scala       | 22 +++++++++-
 .../optimizer/OuterJoinEliminationSuite.scala      | 47 ++++++++++++++++++++++
 3 files changed, 69 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 840a38cc869..05c6bdd5311 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -462,6 +462,7 @@ package object dsl {
         Window(windowExpressions, partitionSpec, orderSpec, logicalPlan)
 
       def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, 
logicalPlan)
+      def subquery(alias: String): LogicalPlan = SubqueryAlias(alias, 
logicalPlan)
 
       def except(otherPlan: LogicalPlan, isAll: Boolean): LogicalPlan =
         Except(logicalPlan, otherPlan, isAll)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index e5e91acf865..796e7d8f89d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -22,7 +22,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
-import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
+import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, 
ExtractFiltersAndInnerJoins}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
@@ -139,6 +139,17 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
  *   SELECT t1.c1, max(t1.c2) FROM t1 GROUP BY t1.c1
  * }}}
  *
+ * 3. Remove outer join if:
+ *   - For a left outer join with only left-side columns being selected and 
the right side join
+ *     keys are unique.
+ *   - For a right outer join with only right-side columns being selected and 
the left side join
+ *     keys are unique.
+ *
+ * {{{
+ *   SELECT t1.* FROM t1 LEFT JOIN (SELECT DISTINCT c1 as c1 FROM t) t2 ON 
t1.c1 = t2.c1  ==>
+ *   SELECT t1.* FROM t1
+ * }}}
+ *
  * This rule should be executed before pushing down the Filter
  */
 object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
@@ -211,6 +222,15 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with 
PredicateHelper {
         if projectList.forall(_.deterministic) && 
p.references.subsetOf(right.outputSet) &&
           allDuplicateAgnostic(aggExprs) =>
       a.copy(child = p.copy(child = right))
+
+    case p @ Project(_, ExtractEquiJoinKeys(LeftOuter, _, rightKeys, _, _, 
left, right, _))
+        if right.distinctKeys.exists(_.subsetOf(ExpressionSet(rightKeys))) &&
+          p.references.subsetOf(left.outputSet) =>
+      p.copy(child = left)
+    case p @ Project(_, ExtractEquiJoinKeys(RightOuter, leftKeys, _, _, _, 
left, right, _))
+        if left.distinctKeys.exists(_.subsetOf(ExpressionSet(leftKeys))) &&
+          p.references.subsetOf(right.outputSet) =>
+      p.copy(child = right)
   }
 }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
index 2530cfded9e..15e4abc13c2 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
@@ -268,4 +268,51 @@ class OuterJoinEliminationSuite extends PlanTest {
 
     comparePlans(optimized, originalQuery.analyze)
   }
+
+  test("SPARK-39172: Remove left/right outer join if only left/right side 
columns are selected " +
+    "and the join keys on the other side are unique") {
+    val x = testRelation.subquery("x")
+    val y = testRelation1.subquery("y")
+    comparePlans(Optimize.execute(
+      x.join(y.groupBy($"d")($"d"), LeftOuter, Some($"a" === $"d"))
+        .select($"a", $"b", $"c").analyze),
+      x.select($"a", $"b", $"c").analyze
+    )
+
+    comparePlans(Optimize.execute(
+      x.join(y.groupBy($"d")($"d", count($"d").as("x")), LeftOuter,
+        Some($"a" === $"d" && $"b" === $"x"))
+        .select($"a", $"b", $"c").analyze),
+      x.select($"a", $"b", $"c").analyze
+    )
+
+    comparePlans(Optimize.execute(
+      x.groupBy($"a")($"a").join(y, RightOuter, Some($"a" === $"d"))
+        .select($"d", $"e", $"f").analyze),
+      y.select($"d", $"e", $"f").analyze
+    )
+
+    comparePlans(Optimize.execute(
+      x.groupBy($"a")($"a", count($"a").as("x")).join(y, RightOuter,
+        Some($"a" === $"d" && $"x" === $"e"))
+        .select($"d", $"e", $"f").analyze),
+      y.select($"d", $"e", $"f").analyze
+    )
+
+    // negative cases
+    // not a equi-join
+    val p1 = x.join(y.groupBy($"d")($"d"), LeftOuter, Some($"a" > $"d"))
+      .select($"a").analyze
+    comparePlans(Optimize.execute(p1), p1)
+
+    // do not exist unique key
+    val p2 = x.join(y.groupBy($"d", $"e")($"d", $"e"), LeftOuter, Some($"a" 
=== $"d"))
+      .select($"a").analyze
+    comparePlans(Optimize.execute(p2), p2)
+
+    // output comes from the right side of a left outer join
+    val p3 = x.join(y.groupBy($"d")($"d"), LeftOuter, Some($"a" === $"d"))
+      .select($"a", $"d").analyze
+    comparePlans(Optimize.execute(p3), p3)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to