Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/19193#discussion_r156058341
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
---
@@ -1920,7 +1927,34 @@ class Analyzer(
case p: LogicalPlan if !p.childrenResolved => p
- // Aggregate without Having clause.
+ // Extract window expressions from aggregate functions. There might
be an aggregate whose
+ // aggregate function contains a window expression as a child, which
we need to extract.
+ // e.g., df.groupBy().agg(max(rank().over(window))
+ case a @ Aggregate(groupingExprs, aggregateExprs, child)
+ if containsAggregateFunctionWithWindowExpression(aggregateExprs) &&
+ a.expressions.forall(_.resolved) =>
+
+ val windowExprAliases = new ArrayBuffer[NamedExpression]()
+ val newAggregateExprs = aggregateExprs.map { expr =>
+ expr.transform {
+ case aggExpr @ AggregateExpression(func, _, _, _) if
hasWindowFunction(func.children) =>
+ val newFuncChildren = func.children.map { funcExpr =>
+ funcExpr.transform {
+ case we: WindowExpression =>
+ // Replace window expressions with aliases to them
+ val windowExprAlias = Alias(we,
s"_we${windowExprAliases.length}")()
+ windowExprAliases += windowExprAlias
+ windowExprAlias.toAttribute
+ }
+ }
+ val newFunc =
func.withNewChildren(newFuncChildren).asInstanceOf[AggregateFunction]
+ aggExpr.copy(aggregateFunction = newFunc)
+ }.asInstanceOf[NamedExpression]
+ }
+ val window = addWindow(windowExprAliases, child)
+ // TODO do we also need a projection here?
+ Aggregate(groupingExprs, newAggregateExprs, window)
--- End diff --
No you don't need a Project.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]