Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12306#discussion_r59798418
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -852,26 +852,142 @@ class Analyzer(
       }
     
       /**
    -   * This rule resolve subqueries inside expressions.
    +   * This rule resolves subqueries inside expressions and it rewrites 
correlated & scalar
    +   * subqueries.
        *
    -   * Note: CTE are handled in CTESubstitution.
    +   * It works as follows:
    +   * 1. Find all subquery expressions in a logical plan. Try to resolve 
them and update the
    +   *    SubQueryExpression with the resolved logical plan.
    +   * 2. For Filter (i.e. WHERE/HAVING clauses) the condition will be split 
by AND. Each part of the
    +   *    condition that contains a [[CorrelatedSubqueryExpression]] will be 
rewritten:
    +   *    a. EXISTS/NOT EXISTS will be rewritten as semi/anti join, 
unresolved conditions in Filter
    +   *       will be pulled out as the join conditions.
    +   *    b. IN/NOT IN will be rewritten as semi/anti join, unresolved 
conditions in the Filter will
    +   *       be pulled out as join conditions, value = selected column will 
also be used as join
    +   *       condition.
    +   *
    +   * Note: CTEs are handled in CTESubstitution.
        */
       object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper {
     
    -    private def hasSubquery(e: Expression): Boolean = {
    -      e.find(_.isInstanceOf[SubqueryExpression]).isDefined
    +    private def hasCorrelatedSubquery(e: Expression): Boolean = {
    +      e.find(_.isInstanceOf[CorrelatedSubqueryExpression]).isDefined
         }
     
         private def hasSubquery(q: LogicalPlan): Boolean = {
    -      q.expressions.exists(hasSubquery)
    +      
q.expressions.exists(_.find(_.isInstanceOf[SubqueryExpression]).isDefined)
    +    }
    +
    +    /**
    +     * Removes the conjunctive predicates of Filter that can't be resolved 
in this logical plan,
    +     * returns the resolved new logical plan and removed predicates.
    +     */
    +    private def removeUnresolvedPredicates(q: LogicalPlan): (LogicalPlan, 
Option[Expression]) = {
    +      val unresolvedConditions = ArrayBuffer[Expression]()
    +      var hasOuterJoin = false
    +      val removed = q transform {
    +        case j: Join if j.joinType != Inner =>
    +          hasOuterJoin = true
    +          j
    +        case f @ Filter(cond, child) if child.resolved && !f.resolved =>
    +          if (hasOuterJoin) {
    +            // The predicates inside a outer/semi join can't be pulled 
over join safely.
    +            throw new AnalysisException("accessing columns of outer query 
inside join is not" +
    +              " supported")
    +          } else {
    +            val (resolved, unresolved) = 
splitConjunctivePredicates(cond).partition(_.resolved)
    +            unresolvedConditions ++= unresolved
    +            if (resolved.nonEmpty) {
    +              Filter(resolved.reduceLeft(And), child)
    +            } else {
    +              child
    +            }
    +          }
    +      }
    +      if (unresolvedConditions.nonEmpty) {
    +        // try to resolve new logical plan and remove unresolved 
predicated again
    +        val (removedAgain, moreConditions) = 
removeUnresolvedPredicates(execute(removed))
    +        (removedAgain, (unresolvedConditions ++ 
moreConditions.toSeq).reduceLeftOption(And))
    +      } else {
    +        if (!q.resolved) {
    +          throw new AnalysisException(s"subquery can't be resolved: 
${q.treeString}")
    +        }
    +        (q, None)
    +      }
    +    }
    +
    +    /**
    +     * Returns a resolved subquery and predicate that will be used to 
rewrite the IN subquery as
    +     * semi join (predicate will be used as join condition).
    +     */
    +    private def rewriteInSubquery(
    +        value: Expression,
    +        subquery: LogicalPlan): (LogicalPlan, Expression) = {
    +      val (resolved, joinCondition) = 
removeUnresolvedPredicates(execute(subquery))
    --- End diff --
    
    Has anyone ever used a Correlated In Subquery? E.G:
    
        SELECT   *
        FROM      a
        WHERE   a.id IN (SELECT id FROM b WHERE b.value < a.value)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to