Github user dilipbiswal commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16954#discussion_r101597685
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 ---
    @@ -83,29 +95,150 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
       }
     
       /**
    -   * Given a predicate expression and an input plan, it rewrites
    -   * any embedded existential sub-query into an existential join.
    -   * It returns the rewritten expression together with the updated plan.
    -   * Currently, it does not support null-aware joins. Embedded NOT IN 
predicates
    -   * are blocked in the Analyzer.
    +   * Given a predicate expression and an input plan, it rewrites any 
embedded existential sub-query
    +   * into an existential join. It returns the rewritten expression 
together with the updated plan.
    +   * Currently, it does not support NOT IN nested inside a NOT expression. 
This case is blocked in
    +   * the Analyzer.
        */
       private def rewriteExistentialExpr(
           exprs: Seq[Expression],
           plan: LogicalPlan): (Option[Expression], LogicalPlan) = {
         var newPlan = plan
         val newExprs = exprs.map { e =>
           e transformUp {
    -        case PredicateSubquery(sub, conditions, nullAware, _) =>
    -          // TODO: support null-aware join
    +        case Exists(sub, conditions, exprId) =>
               val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
    -          newPlan = Join(newPlan, sub, ExistenceJoin(exists), 
conditions.reduceLeftOption(And))
    +          newPlan = Join(newPlan, sub,
    +            ExistenceJoin(exists), conditions.reduceLeftOption(And))
               exists
    -        }
    +        case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) =>
    +          val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
    +          val inConditions = 
getValueExpression(e).zip(sub.output).map(EqualTo.tupled)
    +          newPlan = Join(newPlan, sub,
    +            ExistenceJoin(exists), (inConditions ++ 
conditions).reduceLeftOption(And))
    +          exists
    +      }
         }
         (newExprs.reduceOption(And), newPlan)
       }
     }
     
    + /**
    +  * Pull out all (outer) correlated predicates from a given subquery. This 
method removes the
    +  * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
    +  * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are 
missing) in order to
    +  * be able to evaluate the predicates at the top level.
    +  *
    +  * TODO: Look to merge this rule with RewritePredicateSubquery.
    +  */
    +object PullupCorrelatedPredicates extends Rule[LogicalPlan] with 
PredicateHelper {
    --- End diff --
    
    This function is mostly copied from Analyzer. The only difference is since 
validation is done by analyzer , we only process Filter, Project and Aggregate 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to