allisonwang-db commented on a change in pull request #32470:
URL: https://github.com/apache/spark/pull/32470#discussion_r650734143



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -2457,164 +2451,133 @@ class Analyzer(override val catalogManager: 
CatalogManager)
       _.containsPattern(AGGREGATE), ruleId) {
       // Resolve aggregate with having clause to Filter(..., Aggregate()). 
Note, to avoid wrongly
       // resolve the having condition expression, here we skip resolving it in 
ResolveReferences
-      // and transform it to Filter after aggregate is resolved. See more 
details in SPARK-31519.
+      // and transform it to Filter after aggregate is resolved. Basically 
columns in HAVING should
+      // be resolved with `agg.child.output` first. See more details in 
SPARK-31519.
       case UnresolvedHaving(cond, agg: Aggregate) if agg.resolved =>
-        resolveHaving(Filter(cond, agg), agg)
-
-      case f @ Filter(_, agg: Aggregate) if agg.resolved =>
-        resolveHaving(f, agg)
-
-      case sort @ Sort(sortOrder, global, aggregate: Aggregate) if 
aggregate.resolved =>
-
-        // Try resolving the ordering as though it is in the aggregate clause.
-        try {
-          // If a sort order is unresolved, containing references not in 
aggregate, or containing
-          // `AggregateExpression`, we need to push down it to the underlying 
aggregate operator.
-          val unresolvedSortOrders = sortOrder.filter { s =>
-            !s.resolved || !s.references.subsetOf(aggregate.outputSet) || 
containsAggregate(s)
-          }
-          val aliasedOrdering = unresolvedSortOrders.map(o => Alias(o.child, 
"aggOrder")())
-
-          val aggregateWithExtraOrdering = aggregate.copy(
-            aggregateExpressions = aggregate.aggregateExpressions ++ 
aliasedOrdering)
-
-          val resolvedAggregate: Aggregate =
-            
executeSameContext(aggregateWithExtraOrdering).asInstanceOf[Aggregate]
-
-          val (reResolvedAggExprs, resolvedAliasedOrdering) =
-            
resolvedAggregate.aggregateExpressions.splitAt(aggregate.aggregateExpressions.length)
-
-          // If we pass the analysis check, then the ordering expressions 
should only reference to
-          // aggregate expressions or grouping expressions, and it's safe to 
push them down to
-          // Aggregate.
-          checkAnalysis(resolvedAggregate)
-
-          val originalAggExprs = 
aggregate.aggregateExpressions.map(trimNonTopLevelAliases)
-
-          // If the ordering expression is same with original aggregate 
expression, we don't need
-          // to push down this ordering expression and can reference the 
original aggregate
-          // expression instead.
-          val needsPushDown = ArrayBuffer.empty[NamedExpression]
-          val orderToAlias = unresolvedSortOrders.zip(aliasedOrdering)
-          val evaluatedOrderings =
-            
resolvedAliasedOrdering.asInstanceOf[Seq[Alias]].zip(orderToAlias).map {
-              case (evaluated, (order, aliasOrder)) =>
-                val index = reResolvedAggExprs.indexWhere {
-                  case Alias(child, _) => child semanticEquals evaluated.child
-                  case other => other semanticEquals evaluated.child
-                }
-
-                if (index == -1) {
-                  if (hasCharVarchar(evaluated)) {
-                    needsPushDown += aliasOrder
-                    order.copy(child = aliasOrder)
-                  } else {
-                    needsPushDown += evaluated
-                    order.copy(child = evaluated.toAttribute)
-                  }
-                } else {
-                  order.copy(child = originalAggExprs(index).toAttribute)
-                }
+        resolveOperatorWithAggregate(Seq(cond), agg, (newExprs, newChild) => {
+          Filter(newExprs.head, newChild)
+        })
+
+      case Filter(cond, agg: Aggregate) if agg.resolved =>
+        // We should resolve the references normally based on child.output 
first.
+        val maybeResolved = resolveExpressionByPlanOutput(cond, agg)
+        resolveOperatorWithAggregate(Seq(maybeResolved), agg, (newExprs, 
newChild) => {
+          Filter(newExprs.head, newChild)
+        })
+
+      case Sort(sortOrder, global, agg: Aggregate) if agg.resolved =>
+        // We should resolve the references normally based on child.output 
first.
+        val maybeResolved = 
sortOrder.map(_.child).map(resolveExpressionByPlanOutput(_, agg))
+        resolveOperatorWithAggregate(maybeResolved, agg, (newExprs, newChild) 
=> {
+          val newSortOrder = sortOrder.zip(newExprs).map {
+            case (sortOrder, expr) => sortOrder.copy(child = expr)
           }
+          Sort(newSortOrder, global, newChild)
+        })
+    }
 
-          val sortOrdersMap = unresolvedSortOrders
-            .map(new TreeNodeRef(_))
-            .zip(evaluatedOrderings)
-            .toMap
-          val finalSortOrders = sortOrder.map(s => sortOrdersMap.getOrElse(new 
TreeNodeRef(s), s))
+    /**
+     * Resolves the given expressions as if they are in the given Aggregate 
operator, which means
+     * the column can be resolved using `agg.child` and aggregate 
functions/grouping columns are
+     * allowed. It returns a list of named expressions that need to be 
appended to
+     * `agg.aggregateExpressions`, and the list of resolved expressions.
+     */
+    def resolveExprsWithAggregate(
+        exprs: Seq[Expression],
+        agg: Aggregate): (Seq[NamedExpression], Seq[Expression]) = {
+      def resolveCol(input: Expression): Expression = {
+        input.transform {
+          case u: UnresolvedAttribute =>
+            try {
+              // Resolve the column and wrap it with `TempResolvedColumn`. If 
the resolved column
+              // doesn't end up with as aggregate function input or grouping 
column, we should
+              // undo the column resolution to avoid confusing error message. 
For example, if
+              // a table `t` has two columns `c1` and `c2`, for query `SELECT 
... FROM t
+              // GROUP BY c1 HAVING c2 = 0`, even though we can resolve column 
`c2` here, we
+              // should undo it later and fail with "Column c2 not found".
+              agg.child.resolve(u.nameParts, 
resolver).map(TempResolvedColumn(_, u.nameParts))
+                .getOrElse(u)
+            } catch {
+              case _: AnalysisException => u
+            }
+        }
+      }
 
-          // Since we don't rely on sort.resolved as the stop condition for 
this rule,
-          // we need to check this and prevent applying this rule multiple 
times
-          if (sortOrder == finalSortOrders) {
-            sort
-          } else {
-            Project(aggregate.output,
-              Sort(finalSortOrders, global,
-                aggregate.copy(aggregateExpressions = originalAggExprs ++ 
needsPushDown)))
-          }
-        } catch {
-          // Attempting to resolve in the aggregate can result in ambiguity.  
When this happens,
-          // just return the original plan.
-          case ae: AnalysisException => sort
+      def resolveSubQuery(input: Expression): Expression = {
+        if (SubqueryExpression.hasSubquery(input)) {
+          val fake = Project(Alias(input, "fake")() :: Nil, agg.child)
+          
ResolveSubquery(fake).asInstanceOf[Project].projectList.head.asInstanceOf[Alias].child

Review comment:
       The children of the resolved subquery that are not aggregate expressions 
should also be wrapped in `TempResolvedColumn` (but it seems the current logic 
can't handle this case with proper error messages either. This can be a 
follow-up in the future)
   ```sql
   select c1 from t1 group by c1 having (select sum(c2) from t2 where t1.c2 = 
t2.c1) > 0
   -- org.apache.spark.sql.AnalysisException: Resolved attribute(s) c2#10 
missing from c1#9 in operator
   -- !Filter (scalar-subquery#8 [c2#10] > cast(0 as bigint)).;
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to