Github user liancheng commented on the issue:
https://github.com/apache/spark/pull/20174
@liufengdb I wrote a summary according to our offline discussion to explain
the subtle change made in this PR. Please feel free to use it in the PR
description if it looks good to you :)
----
Same as all the other SQL query engines, Spark SQL supports both grouping
aggregation, e.g.:
```sql
SELECT count(*) FROM a_table GROUP BY key
```
and global aggregation, e.g.:
```sql
SELECT count(*) FROM a_table
```
Essentially, global aggregation is a special case of grouping aggregation
where the whole table is treated as a single group. A key difference, though,
exists in the case where a group contains zero rows:
- Grouping aggregation
```sql
SELECT count(*) AS c FROM range(3) WHERE id < 0 GROUP BY id
-- +---+
-- | c |
-- +---+
-- +---+
```
The above query returns zero rows.
- Global aggregation
```sql
SELECT count(*) AS c FROM range(3) WHERE id < 0
-- +---+
-- | c |
-- +---+
-- | 0 |
-- +---+
```
The above query returns one row. To be more specific, global aggregation
with zero input rows always return a single row with the initial aggregation
state as the output.
To tell whether an `Aggregate` operator `A` is a global aggregation or not,
Spark SQL simply checks the number of grouping keys, and `A` is a global
aggregation if it has zero grouping keys.
However, this simple priciple drops the ball in the following case:
```scala
spark.emptyDataFrame.dropDuplicates().agg(count($"*") as "c").show()
// +---+
// | c |
// +---+
// | 1 |
// +---+
```
The reason is that:
1. `df.dropDuplicates()` is roughly translated into something equivalent
to:
```
val allColumns = df.columns.map { col }
df.groupBy(allColumns: _*).agg(allColumns.head, allColumns.tail: _*)
```
This translation is implemented in the rule
`ReplaceDeduplicateWithAggregate`.
2. `spark.emptyDataFrame` contains zero columns and zero rows.
Therefore, rule `ReplaceDeduplicateWithAggregate` translates
`spark.emptyDataFrame.dropDuplicates()` into something equivalent to:
```scala
spark.emptyDataFrame.groupBy().agg(Map.empty[String, String])
```
Which confuses Spark SQL and gets recognized as a global aggregation
because the aggregate operator contains no grouping keys. As a result, Spark
SQL allocates a single row filled by the initial aggregation state and uses it
as the output, and returns a wrong result.
To fix this issue, this PR tweaks `ReplaceDeduplicateWithAggregate` by
appending a literal `1` to the grouping key list of the resulting `Aggregate`
operator when the input plan contains zero output columns. In this way,
`spark.emptyDataFrame.dropDuplicates()` is now translated into:
```scala
spark.emptyDataFrame.dropDuplicates()
=> spark.emptyDataFrame.groupBy(lit(1)).agg(Map.empty[String, String])
```
Which is now properly treated as a grouping aggregation and returns the
correct answer.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]