Github user liancheng commented on a diff in the pull request:
https://github.com/apache/spark/pull/20174#discussion_r160611832
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
---
@@ -1221,7 +1221,12 @@ object ReplaceDeduplicateWithAggregate extends
Rule[LogicalPlan] {
Alias(new First(attr).toAggregateExpression(),
attr.name)(attr.exprId)
}
}
- Aggregate(keys, aggCols, child)
+ // SPARK-22951: the implementation of aggregate operator treats the
cases with and without
+ // grouping keys differently, when there are not input rows. For the
aggregation after
+ // `dropDuplicates()` on an empty data frame, a grouping key is
added here to make sure the
+ // aggregate operator can work correctly (returning an empty
iterator).
+ val newKeys = if (keys.isEmpty) Literal(1) :: Nil else keys
--- End diff --
Nit: Maybe rename `newKeys` to `nonemptyKeys`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]