Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12720#discussion_r61521519
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -866,71 +867,233 @@ class Analyzer(
        * Note: CTEs are handled in CTESubstitution.
        */
       object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper {
    -
         /**
    -     * Resolve the correlated predicates in the clauses (e.g. WHERE or 
HAVING) of a
    -     * sub-query by using the plan the predicates should be correlated to.
    +     * Resolve the correlated expressions in a subquery by using the an 
outer plans' references. All
    +     * resolved outer references are wrapped in an [[OuterReference]]
          */
    -    private def resolveCorrelatedSubquery(
    -        sub: LogicalPlan, outer: LogicalPlan,
    -        aliases: scala.collection.mutable.Map[Attribute, Alias]): 
LogicalPlan = {
    -      // First resolve as much of the sub-query as possible
    -      val analyzed = execute(sub)
    -      if (analyzed.resolved) {
    -        analyzed
    -      } else {
    -        // Only resolve the lowest plan that is not resolved by outer 
plan, otherwise it could be
    -        // resolved by itself
    -        val resolvedByOuter = analyzed transformDown {
    -          case q: LogicalPlan if q.childrenResolved && !q.resolved =>
    -            q transformExpressions {
    -              case u @ UnresolvedAttribute(nameParts) =>
    -                withPosition(u) {
    -                  try {
    -                    val outerAttrOpt = outer.resolve(nameParts, resolver)
    -                    if (outerAttrOpt.isDefined) {
    -                      val outerAttr = outerAttrOpt.get
    -                      if (q.inputSet.contains(outerAttr)) {
    -                        // Got a conflict, create an alias for the 
attribute come from outer table
    -                        val alias = Alias(outerAttr, outerAttr.toString)()
    -                        val attr = alias.toAttribute
    -                        aliases += attr -> alias
    -                        attr
    -                      } else {
    -                        outerAttr
    -                      }
    -                    } else {
    -                      u
    -                    }
    -                  } catch {
    -                    case a: AnalysisException => u
    +    private def resolveOuterReferences(plan: LogicalPlan, outer: 
LogicalPlan): LogicalPlan = {
    +      plan resolveOperators {
    +        case q: LogicalPlan if q.childrenResolved && !q.resolved =>
    +          q transformExpressions {
    +            case u @ UnresolvedAttribute(nameParts) =>
    +              withPosition(u) {
    +                try {
    +                  outer.resolve(nameParts, resolver) match {
    +                    case Some(outerAttr) => OuterReference(outerAttr)
    +                    case None => u
                       }
    +                } catch {
    +                  case _: AnalysisException => u
                     }
    -            }
    -        }
    -        if (resolvedByOuter fastEquals analyzed) {
    -          analyzed
    -        } else {
    -          resolveCorrelatedSubquery(resolvedByOuter, outer, aliases)
    -        }
    +              }
    +          }
           }
         }
     
    +    /**
    +     * Resolve the subqueries in a single LogicalPlan using the given 
outer plans. This method
    +     * alternates between applying the regular analyzer on the subquery 
and applying the
    +     * resolveOuterReferences rule.
    +     */
    +    private def resolveSubQueries(
    +        plan: LogicalPlan,
    +        outerPlans: Seq[LogicalPlan]): LogicalPlan = plan 
transformExpressions {
    +      case e: SubqueryExpression if !e.query.resolved =>
    +        var previous: LogicalPlan = null
    +        var current = e.query
    +        do {
    +          previous = current
    +          // Try to resolve the subquery plan using the regular analyzer.
    +          current = execute(current)
    +
    +          // Use the outer references to resolve the subquery plan if it 
isn't resolved yet.
    +          val i = outerPlans.iterator
    +          while (!current.resolved && i.hasNext) {
    +            current = resolveOuterReferences(current, i.next())
    +          }
    +        } while (!current.resolved && !current.fastEquals(previous))
    +        e.withNewPlan(current)
    +    }
    +
    +    /**
    +     * Resolve a subquery using the outer plan. This rule creates a 
dedicated analyzer which can
    +     * also resolve outer plan references.
    +     */
         def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    -      // Only a few unary node (Project/Filter/Aggregate/Having) could 
have subquery
    +      // In case of HAVING (a filter after an aggregate) we use both the 
aggregate and its child for
    +      // resolution.
    +      case f @ Filter(_, a: Aggregate) if f.childrenResolved =>
    +        resolveSubQueries(f, Seq(a, a.child))
    +      // Only a few unary nodes (Project/Filter/Aggregate) can contain 
subqueries.
           case q: UnaryNode if q.childrenResolved =>
    -        val aliases = scala.collection.mutable.Map[Attribute, Alias]()
    -        val newPlan = q transformExpressions {
    -          case e: SubqueryExpression if !e.query.resolved =>
    -            e.withNewPlan(resolveCorrelatedSubquery(e.query, q.child, 
aliases))
    +        resolveSubQueries(q, Seq(q.child))
    +    }
    +  }
    +
    +  /**
    +   * This rule pulls out correlated predicates out of a subquery and 
finalize the resolution of
    +   * subqueries. The rule transforms [[Exists]] and [[ListQuery]] 
expressions into
    +   * [[PredicateSubquery]] expressions.
    +   */
    +  object PullOutCorrelatedPredicates extends Rule[LogicalPlan] {
    +    /**
    +     * Pull out all (outer) correlated predicates from a given subquery. 
This method removes the
    +     * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
    +     * to all intermediate [[Project]] and [[Aggregate]] clauses (if they 
are missing) in order to
    +     * be able to evaluate the predicates at the top level.
    +     *
    +     * This method returns the rewritten subquery and correlated 
predicates.
    +     */
    +    private def pullOutCorrelatedPredicates(sub: LogicalPlan): 
(LogicalPlan, Seq[Expression]) = {
    +      val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
    +
    +      /** Make sure a plans' subtree does not contain a tagged predicate. 
*/
    +      def failOnOuterReferenceInSubTree(p: LogicalPlan, msg: String): Unit 
= {
    +        if (p.collect(predicateMap).nonEmpty) {
    +          failAnalysis(s"Accessing outer query column is not allowed in 
$msg: $p")
             }
    -        if (aliases.nonEmpty) {
    -          val projs = q.child.output ++ aliases.values
    -          Project(q.child.output,
    -            newPlan.withNewChildren(Seq(Project(projs, q.child))))
    -        } else {
    -          newPlan
    +      }
    +
    +      /** Helper function for locating outer references. */
    +      def containsOuter(e: Expression): Boolean = {
    +        e.find(_.isInstanceOf[OuterReference]).isDefined
    +      }
    +
    +      /** Make sure a plans' expressions do not contain a tagged 
predicate. */
    +      def failOnOuterReference(p: LogicalPlan): Unit = {
    +        if (p.expressions.exists(containsOuter)) {
    +          failAnalysis(
    +            s"Correlated predicates are not supported outside of 
WHERE/HAVING clauses: $p")
             }
    +      }
    +
    +      /** Determine which correlated predicate references are missing from 
this plan. */
    +      def missingReferences(p: LogicalPlan): AttributeSet = {
    +        val localPredicateReferences = p.collect(predicateMap)
    +          .flatten
    +          .map(_.references)
    +          .reduceOption(_ ++ _)
    +          .getOrElse(AttributeSet.empty)
    +        localPredicateReferences -- p.outputSet
    +      }
    +
    +      val transformed = sub transformUp {
    +        case f @ Filter(cond, child) =>
    +          // Find all predicates with an outer reference.
    +          val (correlated, local) = 
splitConjunctivePredicates(cond).partition(containsOuter)
    +
    +          // Rewrite the filter without the correlated predicates if any.
    +          correlated match {
    +            case Nil => f
    +            case xs if local.nonEmpty =>
    +              val newFilter = Filter(local.reduce(And), child)
    +              predicateMap += newFilter -> xs
    +              newFilter
    +            case xs =>
    +              predicateMap += child -> xs
    +              child
    +          }
    +        case p @ Project(expressions, child) =>
    +          failOnOuterReference(p)
    +          val referencesToAdd = missingReferences(p)
    +          if (referencesToAdd.nonEmpty) {
    +            Project(expressions ++ referencesToAdd, child)
    +          } else {
    +            p
    +          }
    +        case a @ Aggregate(grouping, expressions, child) =>
    +          failOnOuterReference(a)
    +          val referencesToAdd = missingReferences(a)
    +          if (referencesToAdd.nonEmpty) {
    +            Aggregate(grouping ++ referencesToAdd, expressions ++ 
referencesToAdd, child)
    +          } else {
    +            a
    +          }
    +        case j @ Join(left, _, RightOuter, _) =>
    +          failOnOuterReference(j)
    +          failOnOuterReferenceInSubTree(left, "a RIGHT OUTER JOIN")
    +          j
    +        case j @ Join(_, right, jt, _) if jt != Inner =>
    +          failOnOuterReference(j)
    +          failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN")
    +          j
    +        case u: Union =>
    +          failOnOuterReferenceInSubTree(u, "a UNION")
    +          u
    +        case s: SetOperation =>
    +          failOnOuterReferenceInSubTree(s.right, "an INTERSECT/EXCEPT")
    +          s
    +        case p =>
    +          failOnOuterReference(p)
    +          p
    +      }
    +      (transformed, predicateMap.values.flatten.toSeq)
    +    }
    +
    +    /**
    +     * Rewrite the subquery is a safe way by preventing that the subquery 
and the outer use the same
    +     * attributes.
    +     */
    +    private def rewriteSubQuery(
    +        sub: LogicalPlan,
    +        outer: LogicalPlan): (LogicalPlan, Seq[Expression]) = {
    +      // Pull out the tagged predicates and rewrite the subquery in the 
process.
    +      val (basePlan, baseConditions) = pullOutCorrelatedPredicates(sub)
    +
    +      // Make sure the inner and the outer query attributes do not collide.
    +      val duplicates = basePlan.outputSet.intersect(outer.outputSet)
    +      val (plan, deDuplicatedConditions) = if (duplicates.nonEmpty) {
    +        val aliasMap = AttributeMap(duplicates.map { dup =>
    +          dup -> Alias(dup, dup.toString)()
    +        }.toSeq)
    +        val aliasedExpressions = basePlan.output.map { ref =>
    +          aliasMap.getOrElse(ref, ref)
    +        }
    +        val aliasedProjection = Project(aliasedExpressions, basePlan)
    +        val aliasedConditions = baseConditions.map(_.transform {
    +          case ref: Attribute => aliasMap.getOrElse(ref, ref).toAttribute
    +        })
    +        (aliasedProjection, aliasedConditions)
    +      } else {
    +        (basePlan, baseConditions)
    +      }
    +      // Remove outer references from the correlated predicates. We wait 
with extracting
    +      // these until collisions between the inner and outer query 
attributes have been
    +      // solved.
    +      val conditions = deDuplicatedConditions.map(_.transform {
    +        case OuterReference(ref) => ref
    +      })
    +      (plan, conditions)
    +    }
    +
    +    /**
    +     * Rewrite a LogicalPlan containing [[ScalarSubquery]], [[ListQuery]] 
or [[Exists]] expressions.
    +     */
    +    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    +      case q: UnaryNode if q.childrenResolved =>
    +        q transformExpressions {
    +          case ScalarSubquery(sub, None, exprId) if sub.resolved && 
sub.output.nonEmpty =>
    +            val (rewrite, conditions) = rewriteSubQuery(sub, q.child)
    +            ScalarSubquery(rewrite, Some(conditions), exprId)
    +          case Exists(sub, exprId) if sub.resolved =>
    +            val (rewrite, conditions) = rewriteSubQuery(sub, q.child)
    +            PredicateSubquery(rewrite, conditions, nullAware = false, 
exprId)
    +          case In(e, Seq(ListQuery(sub, exprId))) if sub.resolved && 
e.resolved =>
    +            val (rewrite, conditions) = rewriteSubQuery(sub, q.child)
    +            // Get the left hand side expressions.
    +            val expressions = e match {
    +              case CreateStruct(exprs) => exprs
    +              case expr => Seq(expr)
    +            }
    +            // Make sure the number of arguments are equal.
    +            if (expressions.size != sub.output.size) {
    +              failAnalysis(s"The number of fields in the value 
(${expressions.size}) does not " +
    +                s"match with the number of columns in the subquery 
(${sub.output.size})")
    +            }
    +            // Construct the IN conditions.
    +            val inConditions = 
expressions.zip(rewrite.output).map(EqualTo.tupled)
    --- End diff --
    
    They may have same exprId on both sides


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to