cloud-fan commented on code in PR #38888:
URL: https://github.com/apache/spark/pull/38888#discussion_r1040779582
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1591,12 +1620,129 @@ class Analyzer(override val catalogManager:
CatalogManager)
notMatchedBySourceActions = newNotMatchedBySourceActions)
}
- // Skip the having clause here, this will be handled in
ResolveAggregateFunctions.
- case h: UnresolvedHaving => h
+ // For the 3 operators below, they can host grouping expressions and
aggregate functions.
+ // We should resolve columns with `agg.output` and the rule
`ResolveAggregateFunctions` will
+ // push them down to Aggregate later.
+ case u @ UnresolvedHaving(cond, agg: Aggregate) if !cond.resolved =>
+ u.mapExpressions { e =>
+ // Columns in HAVING should be resolved with `agg.child.output`
first, to follow the SQL
+ // standard. See more details in SPARK-31519.
+ resolveExpressionByPlanOutput(resolveColWithAgg(e, agg), agg,
allowOuter = true)
+ }
+ case f @ Filter(cond, agg: Aggregate) if !cond.resolved =>
+ f.mapExpressions { e =>
+ val resolvedNoOuter = resolveExpressionByPlanOutput(e, agg)
+ // Outer reference has lower priority than this. See the doc of
`ResolveReferences`.
+ resolveOuterRef(resolveColWithAgg(resolvedNoOuter, agg))
+ }
+ case s @ Sort(orders, _, agg: Aggregate) if !orders.forall(_.resolved) =>
+ s.mapExpressions { e =>
+ val resolvedNoOuter = resolveExpressionByPlanOutput(e, agg)
+ // Outer reference has lower priority than this. See the doc of
`ResolveReferences`.
+ resolveOuterRef(resolveColWithAgg(resolvedNoOuter, agg))
+ }
+
+ // For the 3 operators below, they can host missing attributes that are
from descendant nodes.
+ // For example, `SELECT a FROM t ORDER BY b`. We can resolve `b` with
table `t` even if there
+ // is a Project node between the table scan node and Sort node. We also
need to propagate
+ // the missing attributes from the descendant node to the current node,
and project them way
+ // at the end via an extra Project.
+ case s @ Sort(order, _, child) if !s.resolved || s.missingInput.nonEmpty
=>
+ val resolvedNoOuter = order.map(resolveExpressionByPlanOutput(_,
child))
+ val (newOrder, newChild) =
resolveExprsAndAddMissingAttrs(resolvedNoOuter, child)
+ // Outer reference has lower priority than this. See the doc of
`ResolveReferences`.
+ val ordering =
newOrder.map(resolveOuterRef).map(_.asInstanceOf[SortOrder])
+ if (child.output == newChild.output) {
+ s.copy(order = ordering)
+ } else {
+ // Add missing attributes and then project them away.
+ val newSort = s.copy(order = ordering, child = newChild)
+ Project(child.output, newSort)
+ }
+
+ case f @ Filter(cond, child) if !f.resolved || f.missingInput.nonEmpty =>
+ val resolvedNoOuter = resolveExpressionByPlanChildren(cond, f)
+ val (newCond, newChild) =
resolveExprsAndAddMissingAttrs(Seq(resolvedNoOuter), child)
+ // Outer reference has lower priority than this. See the doc of
`ResolveReferences`.
+ val finalCond = resolveOuterRef(newCond.head)
+ if (child.output == newChild.output) {
+ f.copy(condition = finalCond)
+ } else {
+ // Add missing attributes and then project them away.
+ val newFilter = Filter(finalCond, newChild)
+ Project(child.output, newFilter)
+ }
+
+ case r @ RepartitionByExpression(partitionExprs, child, _)
+ if !r.resolved || r.missingInput.nonEmpty =>
+ val resolvedNoOuter =
partitionExprs.map(resolveExpressionByPlanChildren(_, r))
+ val (newPartitionExprs, newChild) =
resolveExprsAndAddMissingAttrs(resolvedNoOuter, child)
+ // Outer reference has lower priority than this. See the doc of
`ResolveReferences`.
+ val finalPartitionExprs = newPartitionExprs.map(resolveOuterRef)
+ if (child.output == newChild.output) {
+ r.copy(finalPartitionExprs, newChild)
+ } else {
+ Project(child.output, r.copy(finalPartitionExprs, newChild))
+ }
case q: LogicalPlan =>
logTrace(s"Attempting to resolve
${q.simpleString(conf.maxToStringFields)}")
- q.mapExpressions(resolveExpressionByPlanChildren(_, q))
+ q.mapExpressions(resolveExpressionByPlanChildren(_, q, allowOuter =
true))
+ }
+
+ /**
+ * This method tries to resolve expressions and find missing attributes
recursively.
+ * Specifically, when the expressions used in `Sort` or `Filter` contain
unresolved attributes
+ * or resolved attributes which are missing from child output. This method
tries to find the
+ * missing attributes and add them into the projection.
+ */
+ private def resolveExprsAndAddMissingAttrs(
Review Comment:
moved from
https://github.com/apache/spark/pull/38888/files#diff-ed19f376a63eba52eea59ca71f3355d4495fad4fad4db9a3324aade0d4986a47L2063
, no change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]