Github user yhuai commented on a diff in the pull request:
https://github.com/apache/spark/pull/8371#discussion_r37798474
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
---
@@ -559,21 +529,73 @@ class Analyzer(
}
/**
- * This rule finds expressions in HAVING clause filters that depend on
- * unresolved attributes. It pushes these expressions down to the
underlying
- * aggregates and then projects them away above the filter.
+ * This rule finds aggregate expressions that are not in an aggregate
operator. For example,
+ * those in a HAVING clause or ORDER BY clause. These expressions are
pushed down to the
+ * underlying aggregate operator and then projected away after the
original operator.
*/
- object UnresolvedHavingClauseAttributes extends Rule[LogicalPlan] {
+ object ResolveAggregateFunctions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case filter @ Filter(havingCondition, aggregate @ Aggregate(_,
originalAggExprs, _))
- if aggregate.resolved && containsAggregate(havingCondition) =>
-
- val evaluatedCondition = Alias(havingCondition,
"havingCondition")()
- val aggExprsWithHaving = evaluatedCondition +: originalAggExprs
+ case filter @ Filter(havingCondition,
+ aggregate @ Aggregate(grouping, originalAggExprs, child))
+ if aggregate.resolved && !filter.resolved =>
+
+ // Try resolving the condition of the filter as though it is in
the aggregate clause
+ val aggregatedCondition =
+ Aggregate(grouping, Alias(havingCondition, "havingCondition")()
:: Nil, child)
+ val resolvedOperator = execute(aggregatedCondition)
+ def resolvedAggregateFilter =
+ resolvedOperator
+ .asInstanceOf[Aggregate]
+ .aggregateExpressions.head
+
+ // If resolution was successful and we see the filter has an
aggregate in it, add it to
+ // the original aggregate operator.
+ if (resolvedOperator.resolved &&
containsAggregate(resolvedAggregateFilter)) {
+ val aggExprsWithHaving = resolvedAggregateFilter +:
originalAggExprs
+
+ Project(aggregate.output,
+ Filter(resolvedAggregateFilter.toAttribute,
+ aggregate.copy(aggregateExpressions = aggExprsWithHaving)))
+ } else {
+ filter
+ }
- Project(aggregate.output,
- Filter(evaluatedCondition.toAttribute,
- aggregate.copy(aggregateExpressions = aggExprsWithHaving)))
+ case sort @ Sort(sortOrder, global,
+ aggregate @ Aggregate(grouping, originalAggExprs, child))
+ if aggregate.resolved && !sort.resolved =>
+
+ // Try resolving the ordering as though it is in the aggregate
clause.
+ try {
+ val aliasedOrder = sortOrder.map(o => Alias(o.child,
"aggOrder")())
+ val aggregatedCondition = Aggregate(grouping, aliasedOrder,
child)
+ val resolvedOperator: Aggregate =
execute(aggregatedCondition).asInstanceOf[Aggregate]
+ def resolvedAggregateOrdering =
resolvedOperator.aggregateExpressions
+
+ val needsAggregate =
resolvedAggregateOrdering.exists(containsAggregate)
+ val requiredAttributes =
resolvedAggregateOrdering.map(_.references).reduce(_ ++ _)
+ val missingAttributes = (requiredAttributes --
aggregate.outputSet).nonEmpty
+
+ // If resolution was successful and we see the ordering either
has an aggregate in it or
+ // it is missing something that is projected away by the
aggregate, add the ordering
+ // the original aggregate operator.
+ if (resolvedOperator.resolved && (needsAggregate ||
missingAttributes)) {
+ val evaluatedOrderings: Seq[SortOrder] =
sortOrder.zip(resolvedAggregateOrdering).map {
+ case (order, evaluated) => order.copy(child =
evaluated.toAttribute)
+ }
+ val aggExprsWithHaving: Seq[NamedExpression] =
--- End diff --
wrong variable name `aggExprsWithHaving`?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]