This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d5021f  [SPARK-34533][SQL] Eliminate LEFT ANTI join to empty relation 
in AQE
7d5021f is described below

commit 7d5021f5eed2b9c48bd02b92cce1535edc46d0e4
Author: Cheng Su <chen...@fb.com>
AuthorDate: Fri Feb 26 11:46:27 2021 +0000

    [SPARK-34533][SQL] Eliminate LEFT ANTI join to empty relation in AQE
    
    ### What changes were proposed in this pull request?
    
    I discovered from review discussion - 
https://github.com/apache/spark/pull/31630#discussion_r581774000 , that we can 
eliminate LEFT ANTI join (with no join condition) to empty relation, if the 
right side is known to be non-empty. So with AQE, this is doable similar to 
https://github.com/apache/spark/pull/29484 .
    
    ### Why are the changes needed?
    
    This can help eliminate the join operator during logical plan optimization.
    Before this PR, [left side physical plan `execute()` will be 
called](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala#L192),
 so if left side is complicated (e.g. contain broadcast exchange operator), 
then some computation would happen. However after this PR, the join operator 
will be removed during logical plan, and nothing will be computed from left 
side. Potentially it can save resource for the [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added unit tests for positive and negative queries in 
`AdaptiveQueryExecSuite.scala`.
    
    Closes #31641 from c21/left-anti-aqe.
    
    Authored-by: Cheng Su <chen...@fb.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../adaptive/EliminateJoinToEmptyRelation.scala    | 16 +++++++++++++++-
 .../adaptive/AdaptiveQueryExecSuite.scala          | 22 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala
index cfdd20e..d6df522 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.adaptive
 
 import 
org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
-import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftAnti, LeftSemi}
 import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, 
HashedRelation, HashedRelationWithAllNullKeys}
@@ -33,6 +33,8 @@ import 
org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation
  *    This applies to all Joins (sort merge join, shuffled hash join, and 
broadcast hash join),
  *    because sort merge join and shuffled hash join will be changed to 
broadcast hash join with AQE
  *    at the first place.
+ *
+ * 3. Join is left anti join without condition, and join right side is 
non-empty.
  */
 object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] {
 
@@ -53,5 +55,17 @@ object EliminateJoinToEmptyRelation extends 
Rule[LogicalPlan] {
 
     case j @ Join(_, _, LeftSemi, _, _) if canEliminate(j.right, 
EmptyHashedRelation) =>
       LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)
+
+    case j @ Join(_, _, LeftAnti, None, _) =>
+      val isNonEmptyRightSide = j.right match {
+        case LogicalQueryStage(_, stage: QueryStageExec) if 
stage.resultOption.get().isDefined =>
+          stage.getRuntimeStatistics.rowCount.exists(_ > 0)
+        case _ => false
+      }
+      if (isNonEmptyRightSide) {
+        LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)
+      } else {
+        j
+      }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 122bc2d..d7a1d5d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -1230,6 +1230,28 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-34533: Eliminate left anti join to empty relation") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      withTable("emptyTestData") {
+        spark.range(0).write.saveAsTable("emptyTestData")
+        Seq(
+          // broadcast non-empty right side
+          ("SELECT /*+ broadcast(testData3) */ * FROM testData LEFT ANTI JOIN 
testData3", true),
+          // broadcast empty right side
+          ("SELECT /*+ broadcast(emptyTestData) */ * FROM testData LEFT ANTI 
JOIN emptyTestData",
+            false),
+          // broadcast left side
+          ("SELECT /*+ broadcast(testData) */ * FROM testData LEFT ANTI JOIN 
testData3", false)
+        ).foreach { case (query, isEliminated) =>
+          val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query)
+          assert(findTopLevelBaseJoin(plan).size == 1)
+          assert(findTopLevelBaseJoin(adaptivePlan).isEmpty == isEliminated)
+        }
+      }
+    }
+  }
+
   test("SPARK-32753: Only copy tags to node with no tags") {
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
       withTempView("v1") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to