Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20965#discussion_r182673750
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
    @@ -266,28 +342,53 @@ case class HashAggregateExec(
               
e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
           }
         }
    -    ctx.currentVars = bufVars ++ input
    +
    +    // We need to copy the aggregation buffer to local variables first 
because each aggregate
    +    // function directly updates the buffer when it finishes.
    +    val localBufVars = bufVars.zip(updateExpr).map { case (ev, e) =>
    --- End diff --
    
    If we don't have these copies, some queries return wrong answers, e.g., 
    ```
    import org.apache.spark.sql.execution.debug._
    sql("CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES (1, 
1), (1, 2), (2, 1) AS testData(a, b)")
    val df = sql("SELECT SKEWNESS(a) FROM testData")
    df.debugCodegen
    df.show
    scala> df.show (copy)
    +---------------------------+
    |skewness(CAST(a AS DOUBLE))|
    +---------------------------+
    |         0.7071067811865475|
    +---------------------------+
    
    scala> df.show (non-copy)
    +---------------------------+                                               
    
    |skewness(CAST(a AS DOUBLE))|
    +---------------------------+
    |         -1.368454115659954|
    +---------------------------+
    ```
    In the original gen'd code, aggregation buffer updates happen in the end of 
an `Aggregate` consume;
    
https://gist.github.com/maropu/ec5a322c40e9e7e0024c3074a261197b#file-codegen-d-skew-in-the-master-L807
    
    But, if we split aggregation functions into pieces, these updates happen in 
the end of each split function, e.g.,;
    
https://gist.github.com/maropu/1e9e7cb2377622549163261fc321a108#file-codegen-d-skew-in-spark-21870-2-local-copy-L822
    
    So, if we don't copy the previous state to local buffers, the aggregation 
wrongly references the updated state and then returns an incorrect answer.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to