Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13570#discussion_r66697790
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
    @@ -1715,31 +1715,52 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
           // Filter the plan by applying left semi and left anti joins.
           withSubquery.foldLeft(newFilter) {
             case (p, PredicateSubquery(sub, conditions, _, _)) =>
    -          Join(p, sub, LeftSemi, conditions.reduceOption(And))
    +          val (joinCond, outerPlan) = 
rewriteExistentialExpr(conditions.reduceOption(And), p)
    +          Join(outerPlan, sub, LeftSemi, joinCond)
             case (p, Not(PredicateSubquery(sub, conditions, false, _))) =>
    -          Join(p, sub, LeftAnti, conditions.reduceOption(And))
    +          val (joinCond, outerPlan) = 
rewriteExistentialExpr(conditions.reduceOption(And), p)
    +          Join(outerPlan, sub, LeftAnti, joinCond)
             case (p, Not(PredicateSubquery(sub, conditions, true, _))) =>
    -          // This is a NULL-aware (left) anti join (NAAJ).
    +          // This is a NULL-aware (left) anti join (NAAJ) e.g. col NOT IN 
expr
               // Construct the condition. A NULL in one of the conditions is 
regarded as a positive
               // result; such a row will be filtered out by the Anti-Join 
operator.
    -          val anyNull = conditions.map(IsNull).reduceLeft(Or)
    -          val condition = conditions.reduceLeft(And)
     
    -          // Note that will almost certainly be planned as a Broadcast 
Nested Loop join. Use EXISTS
    -          // if performance matters to you.
    -          Join(p, sub, LeftAnti, Option(Or(anyNull, condition)))
    +          // Note that will almost certainly be planned as a Broadcast 
Nested Loop join.
    +          // Use EXISTS if performance matters to you.
    +          val (joinCond, outerPlan) = 
rewriteExistentialExpr(conditions.reduceLeftOption(And), p)
    +          val anyNull = 
splitConjunctivePredicates(joinCond.get).map(IsNull).reduceLeft(Or)
    +          Join(outerPlan, sub, LeftAnti, Option(Or(anyNull, joinCond.get)))
             case (p, predicate) =>
    -          var joined = p
    -          val replaced = predicate transformUp {
    -            case PredicateSubquery(sub, conditions, nullAware, _) =>
    -              // TODO: support null-aware join
    -              val exists = AttributeReference("exists", BooleanType, 
nullable = false)()
    -              joined = Join(joined, sub, ExistenceJoin(exists), 
conditions.reduceLeftOption(And))
    -              exists
    -          }
    -          Project(p.output, Filter(replaced, joined))
    +          val (newCond, inputPlan) = 
rewriteExistentialExpr(Option(predicate), p)
    +          Project(p.output, Filter(newCond.get, inputPlan))
           }
       }
    +
    +  /**
    +   * Given a predicate expression and an input plan, it rewrites
    +   * any embedded existential sub-query into an existential join.
    +   * It returns the rewritten expression together with the updated plan.
    +   * Currently, it does not support null-aware joins. Embedded NOT IN 
predicates
    +   * are blocked in the Analyzer.
    +   */
    +  private def rewriteExistentialExpr(
    +      expr: Option[Expression],
    --- End diff --
    
    Lets just pass a sequence of expressions. Predicate subqueries are 
guaranteed to have one or more conditions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to