cloud-fan commented on code in PR #38888: URL: https://github.com/apache/spark/pull/38888#discussion_r1040779582
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala: ########## @@ -1591,12 +1620,129 @@ class Analyzer(override val catalogManager: CatalogManager) notMatchedBySourceActions = newNotMatchedBySourceActions) } - // Skip the having clause here, this will be handled in ResolveAggregateFunctions. - case h: UnresolvedHaving => h + // For the 3 operators below, they can host grouping expressions and aggregate functions. + // We should resolve columns with `agg.output` and the rule `ResolveAggregateFunctions` will + // push them down to Aggregate later. + case u @ UnresolvedHaving(cond, agg: Aggregate) if !cond.resolved => + u.mapExpressions { e => + // Columns in HAVING should be resolved with `agg.child.output` first, to follow the SQL + // standard. See more details in SPARK-31519. + resolveExpressionByPlanOutput(resolveColWithAgg(e, agg), agg, allowOuter = true) + } + case f @ Filter(cond, agg: Aggregate) if !cond.resolved => + f.mapExpressions { e => + val resolvedNoOuter = resolveExpressionByPlanOutput(e, agg) + // Outer reference has lower priority than this. See the doc of `ResolveReferences`. + resolveOuterRef(resolveColWithAgg(resolvedNoOuter, agg)) + } + case s @ Sort(orders, _, agg: Aggregate) if !orders.forall(_.resolved) => + s.mapExpressions { e => + val resolvedNoOuter = resolveExpressionByPlanOutput(e, agg) + // Outer reference has lower priority than this. See the doc of `ResolveReferences`. + resolveOuterRef(resolveColWithAgg(resolvedNoOuter, agg)) + } + + // For the 3 operators below, they can host missing attributes that are from descendant nodes. + // For example, `SELECT a FROM t ORDER BY b`. We can resolve `b` with table `t` even if there + // is a Project node between the table scan node and Sort node. We also need to propagate + // the missing attributes from the descendant node to the current node, and project them way + // at the end via an extra Project. + case s @ Sort(order, _, child) if !s.resolved || s.missingInput.nonEmpty => + val resolvedNoOuter = order.map(resolveExpressionByPlanOutput(_, child)) + val (newOrder, newChild) = resolveExprsAndAddMissingAttrs(resolvedNoOuter, child) + // Outer reference has lower priority than this. See the doc of `ResolveReferences`. + val ordering = newOrder.map(resolveOuterRef).map(_.asInstanceOf[SortOrder]) + if (child.output == newChild.output) { + s.copy(order = ordering) + } else { + // Add missing attributes and then project them away. + val newSort = s.copy(order = ordering, child = newChild) + Project(child.output, newSort) + } + + case f @ Filter(cond, child) if !f.resolved || f.missingInput.nonEmpty => + val resolvedNoOuter = resolveExpressionByPlanChildren(cond, f) + val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(resolvedNoOuter), child) + // Outer reference has lower priority than this. See the doc of `ResolveReferences`. + val finalCond = resolveOuterRef(newCond.head) + if (child.output == newChild.output) { + f.copy(condition = finalCond) + } else { + // Add missing attributes and then project them away. + val newFilter = Filter(finalCond, newChild) + Project(child.output, newFilter) + } + + case r @ RepartitionByExpression(partitionExprs, child, _) + if !r.resolved || r.missingInput.nonEmpty => + val resolvedNoOuter = partitionExprs.map(resolveExpressionByPlanChildren(_, r)) + val (newPartitionExprs, newChild) = resolveExprsAndAddMissingAttrs(resolvedNoOuter, child) + // Outer reference has lower priority than this. See the doc of `ResolveReferences`. + val finalPartitionExprs = newPartitionExprs.map(resolveOuterRef) + if (child.output == newChild.output) { + r.copy(finalPartitionExprs, newChild) + } else { + Project(child.output, r.copy(finalPartitionExprs, newChild)) + } case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString(conf.maxToStringFields)}") - q.mapExpressions(resolveExpressionByPlanChildren(_, q)) + q.mapExpressions(resolveExpressionByPlanChildren(_, q, allowOuter = true)) + } + + /** + * This method tries to resolve expressions and find missing attributes recursively. + * Specifically, when the expressions used in `Sort` or `Filter` contain unresolved attributes + * or resolved attributes which are missing from child output. This method tries to find the + * missing attributes and add them into the projection. + */ + private def resolveExprsAndAddMissingAttrs( Review Comment: moved from https://github.com/apache/spark/pull/38888/files#diff-ed19f376a63eba52eea59ca71f3355d4495fad4fad4db9a3324aade0d4986a47L2063 , no change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org