Github user aokolnychyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19193#discussion_r156493072
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -1920,7 +1927,34 @@ class Analyzer(
     
           case p: LogicalPlan if !p.childrenResolved => p
     
    -      // Aggregate without Having clause.
    +      // Extract window expressions from aggregate functions. There might 
be an aggregate whose
    +      // aggregate function contains a window expression as a child, which 
we need to extract.
    +      // e.g., df.groupBy().agg(max(rank().over(window))
    +      case a @ Aggregate(groupingExprs, aggregateExprs, child)
    +        if containsAggregateFunctionWithWindowExpression(aggregateExprs) &&
    +           a.expressions.forall(_.resolved) =>
    +
    +        val windowExprAliases = new ArrayBuffer[NamedExpression]()
    +        val newAggregateExprs = aggregateExprs.map { expr =>
    +          expr.transform {
    --- End diff --
    
    Thanks for looking into this. I am not sure I fully understood "it will 
push the regular aggregate into the underlying window". Could you, please, 
elaborate?
    
    This is what I tried:
    ```
        val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b")
        val window1 = Window.orderBy('a)
        val window2 = Window.orderBy('a.desc)
    
        df.groupBy('a).agg(max(rank().over(window1)), sum('b), 
sum(sum('b)).over(window2)).explain(true)
        df.groupBy('a).agg(max(rank().over(window1)), sum('b), 
sum(sum('b)).over(window2)).show(false)
    ```
    
    It produced the following plans:
    
    ```
    == Analyzed Logical Plan ==
    a: int, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())): 
int, sum(b): bigint, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST 
unspecifiedframe$()): bigint
    Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST 
unspecifiedframe$()))#19, sum(b)#20L, sum(sum(b)) OVER (ORDER BY a DESC NULLS 
LAST unspecifiedframe$())#21L]
    +- Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST 
unspecifiedframe$()))#19, sum(b)#20L, _w0#40L, sum(sum(b)) OVER (ORDER BY a 
DESC NULLS LAST unspecifiedframe$())#21L, sum(sum(b)) OVER (ORDER BY a DESC 
NULLS LAST unspecifiedframe$())#21L]
       +- Window [sum(_w0#40L) windowspecdefinition(a#5 DESC NULLS LAST, 
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS 
sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#21L], [a#5 
DESC NULLS LAST]
          +- Aggregate [a#5], [a#5, max(_we0#36) AS max(RANK() OVER (ORDER BY a 
ASC NULLS FIRST unspecifiedframe$()))#19, sum(cast(b#6 as bigint)) AS 
sum(b)#20L, sum(cast(b#6 as bigint)) AS _w0#40L]
             +- Project [a#5, b#6, _we0#36, _we0#36]
                +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
_we0#36], [a#5 ASC NULLS FIRST]
                   +- Project [_1#2 AS a#5, _2#3 AS b#6]
                      +- LocalRelation [_1#2, _2#3]
    
    == Optimized Logical Plan ==
    Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST 
unspecifiedframe$()))#19, sum(b)#20L, sum(sum(b)) OVER (ORDER BY a DESC NULLS 
LAST unspecifiedframe$())#21L]
    +- Window [sum(_w0#40L) windowspecdefinition(a#5 DESC NULLS LAST, 
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS 
sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#21L], [a#5 
DESC NULLS LAST]
       +- Aggregate [a#5], [a#5, max(_we0#36) AS max(RANK() OVER (ORDER BY a 
ASC NULLS FIRST unspecifiedframe$()))#19, sum(cast(b#6 as bigint)) AS 
sum(b)#20L, sum(cast(b#6 as bigint)) AS _w0#40L]
          +- Project [a#5, b#6, _we0#36, _we0#36]
             +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
_we0#36], [a#5 ASC NULLS FIRST]
                +- LocalRelation [a#5, b#6]
    
    ```
    
    The result was:
    
    ```
    
+---+-----------------------------------------------------------------+------+-----------------------------------------------------------------+
    |a  |max(RANK() OVER (ORDER BY a ASC NULLS FIRST 
unspecifiedframe$()))|sum(b)|sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST 
unspecifiedframe$())|
    
+---+-----------------------------------------------------------------+------+-----------------------------------------------------------------+
    |5  |4                                                                |5    
 |5                                                                |
    |2  |3                                                                |4    
 |9                                                                |
    |1  |1                                                                |5    
 |14                                                               |
    
+---+-----------------------------------------------------------------+------+-----------------------------------------------------------------+
    ```
    
    So, we have a window expression on top of a regular aggregate in 
``sum(sum(c)).over(window2)``, right? This expression is handled by the 
existing part and is not touched by the new case clause. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to