Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12720#discussion_r61520953
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -866,71 +867,233 @@ class Analyzer(
        * Note: CTEs are handled in CTESubstitution.
        */
       object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper {
    -
         /**
    -     * Resolve the correlated predicates in the clauses (e.g. WHERE or 
HAVING) of a
    -     * sub-query by using the plan the predicates should be correlated to.
    +     * Resolve the correlated expressions in a subquery by using the an 
outer plans' references. All
    +     * resolved outer references are wrapped in an [[OuterReference]]
          */
    -    private def resolveCorrelatedSubquery(
    -        sub: LogicalPlan, outer: LogicalPlan,
    -        aliases: scala.collection.mutable.Map[Attribute, Alias]): 
LogicalPlan = {
    -      // First resolve as much of the sub-query as possible
    -      val analyzed = execute(sub)
    -      if (analyzed.resolved) {
    -        analyzed
    -      } else {
    -        // Only resolve the lowest plan that is not resolved by outer 
plan, otherwise it could be
    -        // resolved by itself
    -        val resolvedByOuter = analyzed transformDown {
    -          case q: LogicalPlan if q.childrenResolved && !q.resolved =>
    -            q transformExpressions {
    -              case u @ UnresolvedAttribute(nameParts) =>
    -                withPosition(u) {
    -                  try {
    -                    val outerAttrOpt = outer.resolve(nameParts, resolver)
    -                    if (outerAttrOpt.isDefined) {
    -                      val outerAttr = outerAttrOpt.get
    -                      if (q.inputSet.contains(outerAttr)) {
    -                        // Got a conflict, create an alias for the 
attribute come from outer table
    -                        val alias = Alias(outerAttr, outerAttr.toString)()
    -                        val attr = alias.toAttribute
    -                        aliases += attr -> alias
    -                        attr
    -                      } else {
    -                        outerAttr
    -                      }
    -                    } else {
    -                      u
    -                    }
    -                  } catch {
    -                    case a: AnalysisException => u
    +    private def resolveOuterReferences(plan: LogicalPlan, outer: 
LogicalPlan): LogicalPlan = {
    +      plan resolveOperators {
    +        case q: LogicalPlan if q.childrenResolved && !q.resolved =>
    +          q transformExpressions {
    +            case u @ UnresolvedAttribute(nameParts) =>
    +              withPosition(u) {
    +                try {
    +                  outer.resolve(nameParts, resolver) match {
    +                    case Some(outerAttr) => OuterReference(outerAttr)
    +                    case None => u
                       }
    +                } catch {
    +                  case _: AnalysisException => u
                     }
    -            }
    -        }
    -        if (resolvedByOuter fastEquals analyzed) {
    -          analyzed
    -        } else {
    -          resolveCorrelatedSubquery(resolvedByOuter, outer, aliases)
    -        }
    +              }
    +          }
           }
         }
     
    +    /**
    +     * Resolve the subqueries in a single LogicalPlan using the given 
outer plans. This method
    +     * alternates between applying the regular analyzer on the subquery 
and applying the
    +     * resolveOuterReferences rule.
    +     */
    +    private def resolveSubQueries(
    +        plan: LogicalPlan,
    +        outerPlans: Seq[LogicalPlan]): LogicalPlan = plan 
transformExpressions {
    +      case e: SubqueryExpression if !e.query.resolved =>
    +        var previous: LogicalPlan = null
    +        var current = e.query
    +        do {
    +          previous = current
    +          // Try to resolve the subquery plan using the regular analyzer.
    +          current = execute(current)
    +
    +          // Use the outer references to resolve the subquery plan if it 
isn't resolved yet.
    +          val i = outerPlans.iterator
    +          while (!current.resolved && i.hasNext) {
    +            current = resolveOuterReferences(current, i.next())
    --- End diff --
    
    ```
    do {
       current = execute(current)
       previous = current
       val i = outerPlans.iterator
       while (!current.resolved && current.fastEqual(previous) && i.hasNext) {
         current = resolveOuterReferences(current, i.next())
       )
    } while (!current.resolved && !current.fastEquals(previous))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to