Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/18113#discussion_r155551168
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala ---
@@ -263,6 +262,25 @@ class DatasetAggregatorSuite extends QueryTest with
SharedSQLContext {
("a", 4), ("b", 3))
}
+ test("typed aggregate: min, max") {
+ val ds = Seq("a" -> 1, "a" -> 3, "b" -> 4, "b" -> -4, "b" -> 0).toDS()
+ checkDataset(
+ ds.groupByKey(_._1).agg(
+ typed.min(_._2), typed.minLong(_._2), typed.max(_._2),
typed.maxLong(_._2)),
+ ("a", Some(1.0), Some(1L), Some(3.0), Some(3L)),
+ ("b", Some(-4.0), Some(-4L), Some(4.0), Some(4L)))
+ }
+
+ test("typed aggregate: empty") {
+ val empty = Seq.empty[(Double, Double)].toDS
+ val f = (x: (Double, Double)) => x._2
+ val g = (x: (Long, Long)) => x._2
+ checkDataset(
+ empty.agg(typed.sum(f), typed.sumLong(g), typed.avg(f),
--- End diff --
the problem is, `empty.agg` is relational aggregation, it actually calls
`groupBy().agg`, so the returned type is `Row` instead of Scala objects.
I just found there is no global aggregate for typed aggregation, maybe we
need to add that first.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]