Github user liancheng commented on the issue:

    https://github.com/apache/spark/pull/20174
  
    @liufengdb I wrote a summary according to our offline discussion to explain 
the subtle change made in this PR. Please feel free to use it in the PR 
description if it looks good to you :)
    
    ----
    
    Same as all the other SQL query engines, Spark SQL supports both grouping 
aggregation, e.g.:
    
    ```sql
    SELECT count(*) FROM a_table GROUP BY key
    ```
    
    and global aggregation, e.g.:
    
    ```sql
    SELECT count(*) FROM a_table
    ```
    
    Essentially, global aggregation is a special case of grouping aggregation 
where the whole table is treated as a single group. A key difference, though, 
exists in the case where a group contains zero rows:
    
    - Grouping aggregation
    
      ```sql
      SELECT count(*) AS c FROM range(3) WHERE id < 0 GROUP BY id
      -- +---+
      -- | c |
      -- +---+
      -- +---+
      ```
    
      The above query returns zero rows.
    
    - Global aggregation
    
      ```sql
      SELECT count(*) AS c FROM range(3) WHERE id < 0
      -- +---+
      -- | c |
      -- +---+
      -- | 0 |
      -- +---+
      ```
    
      The above query returns one row. To be more specific, global aggregation 
with zero input rows always return a single row with the initial aggregation 
state as the output.
    
    To tell whether an `Aggregate` operator `A` is a global aggregation or not, 
Spark SQL simply checks the number of grouping keys, and `A` is a global 
aggregation if it has zero grouping keys.
    
    However, this simple priciple drops the ball in the following case:
    
    ```scala
    spark.emptyDataFrame.dropDuplicates().agg(count($"*") as "c").show()
    // +---+
    // | c |
    // +---+
    // | 1 |
    // +---+
    ```
    
    The reason is that:
    
    1.  `df.dropDuplicates()` is roughly translated into something equivalent 
to:
    
        ```
        val allColumns = df.columns.map { col }
        df.groupBy(allColumns: _*).agg(allColumns.head, allColumns.tail: _*)
        ```
    
        This translation is implemented in the rule 
`ReplaceDeduplicateWithAggregate`.
    
    2.  `spark.emptyDataFrame` contains zero columns and zero rows.
    
    Therefore, rule `ReplaceDeduplicateWithAggregate` translates 
`spark.emptyDataFrame.dropDuplicates()` into something equivalent to:
    
    ```scala
    spark.emptyDataFrame.groupBy().agg(Map.empty[String, String])
    ```
    
    Which confuses Spark SQL and gets recognized as a global aggregation 
because the aggregate operator contains no grouping keys. As a result, Spark 
SQL allocates a single row filled by the initial aggregation state and uses it 
as the output, and returns a wrong result.
    
    To fix this issue, this PR tweaks `ReplaceDeduplicateWithAggregate` by 
appending a literal `1` to the grouping key list of the resulting `Aggregate` 
operator when the input plan contains zero output columns. In this way, 
`spark.emptyDataFrame.dropDuplicates()` is now translated into:
    
    ```scala
    spark.emptyDataFrame.dropDuplicates()
    => spark.emptyDataFrame.groupBy(lit(1)).agg(Map.empty[String, String])
    ```
    
    Which is now properly treated as a grouping aggregation and returns the 
correct answer.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to