Github user aokolnychyi commented on a diff in the pull request:
https://github.com/apache/spark/pull/19193#discussion_r156493072
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
---
@@ -1920,7 +1927,34 @@ class Analyzer(
case p: LogicalPlan if !p.childrenResolved => p
- // Aggregate without Having clause.
+ // Extract window expressions from aggregate functions. There might
be an aggregate whose
+ // aggregate function contains a window expression as a child, which
we need to extract.
+ // e.g., df.groupBy().agg(max(rank().over(window))
+ case a @ Aggregate(groupingExprs, aggregateExprs, child)
+ if containsAggregateFunctionWithWindowExpression(aggregateExprs) &&
+ a.expressions.forall(_.resolved) =>
+
+ val windowExprAliases = new ArrayBuffer[NamedExpression]()
+ val newAggregateExprs = aggregateExprs.map { expr =>
+ expr.transform {
--- End diff --
Thanks for looking into this. I am not sure I fully understood "it will
push the regular aggregate into the underlying window". Could you, please,
elaborate?
This is what I tried:
```
val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b")
val window1 = Window.orderBy('a)
val window2 = Window.orderBy('a.desc)
df.groupBy('a).agg(max(rank().over(window1)), sum('b),
sum(sum('b)).over(window2)).explain(true)
df.groupBy('a).agg(max(rank().over(window1)), sum('b),
sum(sum('b)).over(window2)).show(false)
```
It produced the following plans:
```
== Analyzed Logical Plan ==
a: int, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())):
int, sum(b): bigint, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST
unspecifiedframe$()): bigint
Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST
unspecifiedframe$()))#19, sum(b)#20L, sum(sum(b)) OVER (ORDER BY a DESC NULLS
LAST unspecifiedframe$())#21L]
+- Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST
unspecifiedframe$()))#19, sum(b)#20L, _w0#40L, sum(sum(b)) OVER (ORDER BY a
DESC NULLS LAST unspecifiedframe$())#21L, sum(sum(b)) OVER (ORDER BY a DESC
NULLS LAST unspecifiedframe$())#21L]
+- Window [sum(_w0#40L) windowspecdefinition(a#5 DESC NULLS LAST,
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS
sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#21L], [a#5
DESC NULLS LAST]
+- Aggregate [a#5], [a#5, max(_we0#36) AS max(RANK() OVER (ORDER BY a
ASC NULLS FIRST unspecifiedframe$()))#19, sum(cast(b#6 as bigint)) AS
sum(b)#20L, sum(cast(b#6 as bigint)) AS _w0#40L]
+- Project [a#5, b#6, _we0#36, _we0#36]
+- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
_we0#36], [a#5 ASC NULLS FIRST]
+- Project [_1#2 AS a#5, _2#3 AS b#6]
+- LocalRelation [_1#2, _2#3]
== Optimized Logical Plan ==
Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST
unspecifiedframe$()))#19, sum(b)#20L, sum(sum(b)) OVER (ORDER BY a DESC NULLS
LAST unspecifiedframe$())#21L]
+- Window [sum(_w0#40L) windowspecdefinition(a#5 DESC NULLS LAST,
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS
sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#21L], [a#5
DESC NULLS LAST]
+- Aggregate [a#5], [a#5, max(_we0#36) AS max(RANK() OVER (ORDER BY a
ASC NULLS FIRST unspecifiedframe$()))#19, sum(cast(b#6 as bigint)) AS
sum(b)#20L, sum(cast(b#6 as bigint)) AS _w0#40L]
+- Project [a#5, b#6, _we0#36, _we0#36]
+- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
_we0#36], [a#5 ASC NULLS FIRST]
+- LocalRelation [a#5, b#6]
```
The result was:
```
+---+-----------------------------------------------------------------+------+-----------------------------------------------------------------+
|a |max(RANK() OVER (ORDER BY a ASC NULLS FIRST
unspecifiedframe$()))|sum(b)|sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST
unspecifiedframe$())|
+---+-----------------------------------------------------------------+------+-----------------------------------------------------------------+
|5 |4 |5
|5 |
|2 |3 |4
|9 |
|1 |1 |5
|14 |
+---+-----------------------------------------------------------------+------+-----------------------------------------------------------------+
```
So, we have a window expression on top of a regular aggregate in
``sum(sum(c)).over(window2)``, right? This expression is handled by the
existing part and is not touched by the new case clause.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]