cloud-fan commented on code in PR #38888:
URL: https://github.com/apache/spark/pull/38888#discussion_r1040779582


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1591,12 +1620,129 @@ class Analyzer(override val catalogManager: 
CatalogManager)
               notMatchedBySourceActions = newNotMatchedBySourceActions)
         }
 
-      // Skip the having clause here, this will be handled in 
ResolveAggregateFunctions.
-      case h: UnresolvedHaving => h
+      // For the 3 operators below, they can host grouping expressions and 
aggregate functions.
+      // We should resolve columns with `agg.output` and the rule 
`ResolveAggregateFunctions` will
+      // push them down to Aggregate later.
+      case u @ UnresolvedHaving(cond, agg: Aggregate) if !cond.resolved =>
+        u.mapExpressions { e =>
+          // Columns in HAVING should be resolved with `agg.child.output` 
first, to follow the SQL
+          // standard. See more details in SPARK-31519.
+          resolveExpressionByPlanOutput(resolveColWithAgg(e, agg), agg, 
allowOuter = true)
+        }
+      case f @ Filter(cond, agg: Aggregate) if !cond.resolved =>
+        f.mapExpressions { e =>
+          val resolvedNoOuter = resolveExpressionByPlanOutput(e, agg)
+          // Outer reference has lower priority than this. See the doc of 
`ResolveReferences`.
+          resolveOuterRef(resolveColWithAgg(resolvedNoOuter, agg))
+        }
+      case s @ Sort(orders, _, agg: Aggregate) if !orders.forall(_.resolved) =>
+        s.mapExpressions { e =>
+          val resolvedNoOuter = resolveExpressionByPlanOutput(e, agg)
+          // Outer reference has lower priority than this. See the doc of 
`ResolveReferences`.
+          resolveOuterRef(resolveColWithAgg(resolvedNoOuter, agg))
+        }
+
+      // For the 3 operators below, they can host missing attributes that are 
from descendant nodes.
+      // For example, `SELECT a FROM t ORDER BY b`. We can resolve `b` with 
table `t` even if there
+      // is a Project node between the table scan node and Sort node. We also 
need to propagate
+      // the missing attributes from the descendant node to the current node, 
and project them way
+      // at the end via an extra Project.
+      case s @ Sort(order, _, child) if !s.resolved || s.missingInput.nonEmpty 
=>
+        val resolvedNoOuter = order.map(resolveExpressionByPlanOutput(_, 
child))
+        val (newOrder, newChild) = 
resolveExprsAndAddMissingAttrs(resolvedNoOuter, child)
+        // Outer reference has lower priority than this. See the doc of 
`ResolveReferences`.
+        val ordering = 
newOrder.map(resolveOuterRef).map(_.asInstanceOf[SortOrder])
+        if (child.output == newChild.output) {
+          s.copy(order = ordering)
+        } else {
+          // Add missing attributes and then project them away.
+          val newSort = s.copy(order = ordering, child = newChild)
+          Project(child.output, newSort)
+        }
+
+      case f @ Filter(cond, child) if !f.resolved || f.missingInput.nonEmpty =>
+        val resolvedNoOuter = resolveExpressionByPlanChildren(cond, f)
+        val (newCond, newChild) = 
resolveExprsAndAddMissingAttrs(Seq(resolvedNoOuter), child)
+        // Outer reference has lower priority than this. See the doc of 
`ResolveReferences`.
+        val finalCond = resolveOuterRef(newCond.head)
+        if (child.output == newChild.output) {
+          f.copy(condition = finalCond)
+        } else {
+          // Add missing attributes and then project them away.
+          val newFilter = Filter(finalCond, newChild)
+          Project(child.output, newFilter)
+        }
+
+      case r @ RepartitionByExpression(partitionExprs, child, _)
+          if !r.resolved || r.missingInput.nonEmpty =>
+        val resolvedNoOuter = 
partitionExprs.map(resolveExpressionByPlanChildren(_, r))
+        val (newPartitionExprs, newChild) = 
resolveExprsAndAddMissingAttrs(resolvedNoOuter, child)
+        // Outer reference has lower priority than this. See the doc of 
`ResolveReferences`.
+        val finalPartitionExprs = newPartitionExprs.map(resolveOuterRef)
+        if (child.output == newChild.output) {
+          r.copy(finalPartitionExprs, newChild)
+        } else {
+          Project(child.output, r.copy(finalPartitionExprs, newChild))
+        }
 
       case q: LogicalPlan =>
         logTrace(s"Attempting to resolve 
${q.simpleString(conf.maxToStringFields)}")
-        q.mapExpressions(resolveExpressionByPlanChildren(_, q))
+        q.mapExpressions(resolveExpressionByPlanChildren(_, q, allowOuter = 
true))
+    }
+
+    /**
+     * This method tries to resolve expressions and find missing attributes 
recursively.
+     * Specifically, when the expressions used in `Sort` or `Filter` contain 
unresolved attributes
+     * or resolved attributes which are missing from child output. This method 
tries to find the
+     * missing attributes and add them into the projection.
+     */
+    private def resolveExprsAndAddMissingAttrs(

Review Comment:
   moved from 
https://github.com/apache/spark/pull/38888/files#diff-ed19f376a63eba52eea59ca71f3355d4495fad4fad4db9a3324aade0d4986a47L2063
 , no change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to