dilipbiswal commented on a change in pull request #24331: [SPARK-19712][SQL] 
Pushdown LeftSemi/LeftAnti below join
URL: https://github.com/apache/spark/pull/24331#discussion_r275464579
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
 ##########
 @@ -159,3 +159,102 @@ object PushDownLeftSemiAntiJoin extends 
Rule[LogicalPlan] with PredicateHelper {
     }
   }
 }
+
+/**
+ * This rule is a variant of [[PushPredicateThroughJoin]] which can handle
+ * pushing down Left semi and Left Anti joins below a join operator. The
+ * allowable join types are:
+ *  1) Inner
+ *  2) Cross
+ *  3) LeftOuter
+ *  4) RightOuter
+ */
+object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with 
PredicateHelper {
+  /**
+   * Define an enumeration to identify whether a LeftSemi/LeftAnti join can be 
pushed down to
+   * the left leg or the right leg of the join.
+   */
+  object PushdownDirection extends Enumeration {
+    val TO_LEFT_BRANCH, TO_RIGHT_BRANCH, NONE = Value
+  }
+
+  object AllowedJoin {
+    def unapply(join: Join): Option[Join] = join.joinType match {
+      case Inner | Cross | LeftOuter | RightOuter => Some(join)
+      case _ => None
+    }
+  }
+
+  /**
+   * Determine which side of the join a LeftSemi/LeftAnti join can be pushed 
to.
+   */
+  private def pushTo(leftChild: Join, rightChild: LogicalPlan, joinCond: 
Option[Expression]) = {
+    val left = leftChild.left
+    val right = leftChild.right
+    val joinType = leftChild.joinType
+    val rightOutput = rightChild.outputSet
+
+    if (joinCond.nonEmpty) {
+      val noPushdown = (PushdownDirection.NONE, None)
+      val conditions = splitConjunctivePredicates(joinCond.get)
+      val (leftConditions, rest) =
+        conditions.partition(_.references.subsetOf(left.outputSet ++ 
rightOutput))
+      val (rightConditions, commonConditions) =
+        rest.partition(_.references.subsetOf(right.outputSet ++ rightOutput))
+
+      if (rest.isEmpty && leftConditions.nonEmpty) {
+        // When the join conditions can be computed based on the left leg of
+        // leftsemi/anti join then push the leftsemi/anti join to the left 
side.
+        (PushdownDirection.TO_LEFT_BRANCH, 
leftConditions.reduceLeftOption(And))
+      } else if (leftConditions.isEmpty && rightConditions.nonEmpty && 
commonConditions.isEmpty) {
+        // When the join conditions can be computed based on the attributes 
from right leg of
+        // leftsemi/anti join then push the leftsemi/anti join to the right 
side.
+        (PushdownDirection.TO_RIGHT_BRANCH, 
rightConditions.reduceLeftOption(And))
+      } else {
+        noPushdown
+      }
+    } else {
+      /**
+       * When the join condition is empty,
+       * 1) if this is a left outer join or inner join, push leftsemi/anti 
join down
 
 Review comment:
   @cloud-fan Perhaps its possible. In this PR, i was focusing on what is 
happening today in `PushPredicateThroughJoin` and keep the behaviour same. We 
can look into improving both this rule and `PushPredicateThroughJoin` as 
follow-up. The reason i say it is, probably we need to test more and prove that 
pushdown to both sides don't create any side effects or can cause wrong results 
?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to