Github user aokolnychyi commented on the issue:
https://github.com/apache/spark/pull/19193
@hvanhovell here is a summary of tried scenarios:
```
val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b")
val window1 = Window.orderBy('a)
val window2 = Window.orderBy('a.desc)
```
**Scenario 1: An aggregate on top of a window expression (did not work
before, looks OK now)**
```
df.groupBy().agg(max(rank().over(window1))).explain(true)
df.groupBy().agg(max(rank().over(window1))).show(false)
== Analyzed Logical Plan ==
max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())): int
Aggregate [max(_we0#27) AS max(RANK() OVER (ORDER BY a ASC NULLS FIRST
unspecifiedframe$()))#16]
+- Project [a#5, b#6, _we0#27, _we0#27]
+- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
_we0#27], [a#5 ASC NULLS FIRST]
+- Project [_1#2 AS a#5, _2#3 AS b#6]
+- LocalRelation [_1#2, _2#3]
== Optimized Logical Plan ==
Aggregate [max(_we0#27) AS max(RANK() OVER (ORDER BY a ASC NULLS FIRST
unspecifiedframe$()))#16]
+- Project [_we0#27, _we0#27]
+- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
_we0#27], [a#5 ASC NULLS FIRST]
+- LocalRelation [a#5]
+-----------------------------------------------------------------+
|max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))|
+-----------------------------------------------------------------+
|4 |
+-----------------------------------------------------------------+
```
**Scenario 2: An aggregate with grouping expressions on top of a window
expression**
TODO: Is the result wrong? What is expected?
```
df.groupBy('a).agg(max(rank().over(window1))).explain(true)
df.groupBy('a).agg(max(rank().over(window1))).show(false)
== Analyzed Logical Plan ==
a: int, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())):
int
Aggregate [a#5], [a#5, max(_we0#75) AS max(RANK() OVER (ORDER BY a ASC
NULLS FIRST unspecifiedframe$()))#63]
+- Project [a#5, b#6, _we0#75, _we0#75]
+- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
_we0#75], [a#5 ASC NULLS FIRST]
+- Project [_1#2 AS a#5, _2#3 AS b#6]
+- LocalRelation [_1#2, _2#3]
== Optimized Logical Plan ==
Aggregate [a#5], [a#5, max(_we0#75) AS max(RANK() OVER (ORDER BY a ASC
NULLS FIRST unspecifiedframe$()))#63]
+- Project [a#5, _we0#75, _we0#75]
+- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
_we0#75], [a#5 ASC NULLS FIRST]
+- LocalRelation [a#5]
+---+-----------------------------------------------------------------+
|a |max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))|
+---+-----------------------------------------------------------------+
|1 |1 |
|2 |3 |
|5 |4 |
+---+-----------------------------------------------------------------+
```
**Scenario 3: A normal aggregate, an aggregate on top of a window
expression, a window expression on top of an aggregate in one query**
This is resolved in two steps.
```
df.groupBy('a).agg(max(rank().over(window1)), sum('b),
sum(sum('b)).over(window2)).explain(true)
df.groupBy('a).agg(max(rank().over(window1)), sum('b),
sum(sum('b)).over(window2)).show(false)
== Analyzed Logical Plan ==
a: int, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())):
int, sum(b): bigint, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST
unspecifiedframe$()): bigint
Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST
unspecifiedframe$()))#116, sum(b)#117L, sum(sum(b)) OVER (ORDER BY a DESC NULLS
LAST unspecifiedframe$())#118L]
+- Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST
unspecifiedframe$()))#116, sum(b)#117L, _w0#137L, sum(sum(b)) OVER (ORDER BY a
DESC NULLS LAST unspecifiedframe$())#118L, sum(sum(b)) OVER (ORDER BY a DESC
NULLS LAST unspecifiedframe$())#118L]
+- Window [sum(_w0#137L) windowspecdefinition(a#5 DESC NULLS LAST,
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS
sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#118L], [a#5
DESC NULLS LAST]
+- Aggregate [a#5], [a#5, max(_we0#133) AS max(RANK() OVER (ORDER BY
a ASC NULLS FIRST unspecifiedframe$()))#116, sum(cast(b#6 as bigint)) AS
sum(b)#117L, sum(cast(b#6 as bigint)) AS _w0#137L]
+- Project [a#5, b#6, _we0#133, _we0#133]
+- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
_we0#133], [a#5 ASC NULLS FIRST]
+- Project [_1#2 AS a#5, _2#3 AS b#6]
+- LocalRelation [_1#2, _2#3]
== Optimized Logical Plan ==
Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST
unspecifiedframe$()))#116, sum(b)#117L, sum(sum(b)) OVER (ORDER BY a DESC NULLS
LAST unspecifiedframe$())#118L]
+- Window [sum(_w0#137L) windowspecdefinition(a#5 DESC NULLS LAST,
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS
sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#118L], [a#5
DESC NULLS LAST]
+- Aggregate [a#5], [a#5, max(_we0#133) AS max(RANK() OVER (ORDER BY a
ASC NULLS FIRST unspecifiedframe$()))#116, sum(cast(b#6 as bigint)) AS
sum(b)#117L, sum(cast(b#6 as bigint)) AS _w0#137L]
+- Project [a#5, b#6, _we0#133, _we0#133]
+- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
_we0#133], [a#5 ASC NULLS FIRST]
+- LocalRelation [a#5, b#6]
+---+-----------------------------------------------------------------+------+-----------------------------------------------------------------+
|a |max(RANK() OVER (ORDER BY a ASC NULLS FIRST
unspecifiedframe$()))|sum(b)|sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST
unspecifiedframe$())|
+---+-----------------------------------------------------------------+------+-----------------------------------------------------------------+
|5 |4 |5
|5 |
|2 |3 |4
|9 |
|1 |1 |5
|14 |
+---+-----------------------------------------------------------------+------+-----------------------------------------------------------------+
```
**Scenario 4: An aggregate on top of a window expression, which is defined
on top of another aggregate**
TODO: Not supported now. What is expected? Shall we support this or an
exception should be thrown?
```
df.groupBy('a).agg(max(sum(sum('b)).over(window1))).show(false)
```
**Open Questions**
1. Does it make sense to support window expressions inside aggregate
functions? Or shall this be placed into a subquery? For example,
``CheckAnalysis`` prohibits aggregates on top of other aggregates. Maybe, it
should be similar in this case.
2. Scenarios 2 and 4
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]