Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/17520#discussion_r109604178
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
---
@@ -1032,6 +1251,109 @@ object PushPredicateThroughJoin extends
Rule[LogicalPlan] with PredicateHelper {
}
/**
+ * Pushes down a subquery, in the form of [[Join LeftSemi/LeftAnti]]
operator
+ * to the left or right side of a join below.
+ */
+object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with
PredicateHelper {
+ /**
+ * Define an enumeration to identify whether a Exists/In subquery,
+ * in the form of a LeftSemi/LeftAnti, can be pushed down to
+ * the left table or the right table.
+ */
+ object subqueryPushdown extends Enumeration {
+ val toRightTable, toLeftTable, none = Value
+ }
+
+ /**
+ * Determine which side of the join an Exists/In subquery (in the form of
+ * LeftSemi/LeftAnti join) can be pushed down to.
+ */
+ private def pushTo(child: Join, subquery: LogicalPlan, joinCond:
Option[Expression]) = {
+ val left = child.left
+ val right = child.right
+ val joinType = child.joinType
+ val subqueryOutput = subquery.outputSet
+
+ if (joinCond.nonEmpty) {
+ /**
+ * Note: In order to ensure correctness, it's important to not
change the relative ordering of
+ * any deterministic expression that follows a non-deterministic
expression. To achieve this,
+ * we only consider pushing down those expressions that precede the
first non-deterministic
+ * expression in the condition.
+ */
+ val noPushdown = (subqueryPushdown.none, None)
+ val conditions = splitConjunctivePredicates(joinCond.get)
+ val (candidates, containingNonDeterministic) =
conditions.span(_.deterministic)
+ lazy val (pushDownCandidates, subquery) =
+ candidates.partition { cond =>
+ !SubqueryExpression.hasCorrelatedSubquery(cond) &&
+ !SubExprUtils.containsOuter(cond)
+ }
+ lazy val (leftConditions, rest) =
+ pushDownCandidates.partition(_.references.subsetOf(left.outputSet
++ subqueryOutput))
+ lazy val (rightConditions, commonConditions) =
+ rest.partition(_.references.subsetOf(right.outputSet ++
subqueryOutput))
+
+ if (containingNonDeterministic.nonEmpty || subquery.nonEmpty) {
+ noPushdown
+ } else {
+ if (rest.isEmpty && leftConditions.nonEmpty) {
+ // When all the join conditions are only between left table and
the subquery
+ // push the subquery to the left table.
+ (subqueryPushdown.toLeftTable,
leftConditions.reduceLeftOption(And))
+ } else if (leftConditions.isEmpty && rightConditions.nonEmpty &&
commonConditions.isEmpty) {
+ // When all the join conditions are only between right table and
the subquery
+ // push the subquery to the right table.
+ (subqueryPushdown.toRightTable,
rightConditions.reduceLeftOption(And))
+ } else {
+ noPushdown
+ }
+ }
+ } else {
+ /**
+ * When there is no correlated predicate,
+ * 1) if this is a left outer join, push the subquery down to the
left table
+ * 2) if a right outer join, to the right table,
+ * 3) if an inner join, push to either side.
+ */
+ val action = joinType match {
+ case RightOuter =>
+ subqueryPushdown.toRightTable
+ case _: InnerLike | LeftOuter =>
+ subqueryPushdown.toLeftTable
+ case _ =>
+ subqueryPushdown.none
+ }
+ (action, None)
+ }
+ }
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ // push LeftSemi/LeftAnti down into the join below
+ case j @ Join(child @ Join(left, right, _ : InnerLike | LeftOuter |
RightOuter, belowJoinCond),
+ subquery, LeftSemiOrAnti(joinType), joinCond) =>
--- End diff --
I am afraid that this pushdown will interfere with join reordering rules.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]