GitHub user hvanhovell opened a pull request:
https://github.com/apache/spark/pull/12874
[SPARK-10605][SQL] Create native collect_list/collect_set aggregates
## What changes were proposed in this pull request?
We currently use the Hive implementations for the collect_list/collect_set
aggregate functions. This has a few major drawbacks, the use of HiveUDAF (which
has quite a bit of overhead) and the lack of support for struct datatypes. This
PR adds native implementation of these functions to Spark.
The size of the collected list/set vary, this means we cannot use the fast,
Tungsten, aggregation path to perform the aggregation, and that we fallback to
the slower sort based path. Another big issue with these operators is that when
the size of the collected list/set grows too large, we can start experiencing
large GC pauzes and OOMEs.
This `collect*` aggregates implemented in this PR rely on the sort based
aggregate path for correctness. They maintain their own internal buffer which
holds the rows for one group at a time. The sortbased aggregation path is
triggered by disabling `partialAggregation` for the aggregates (which is kinda
funny); this technique is also employed in
`org.apache.spark.sql.hiveHiveUDAFFunction`.
I have done some performance testing:
```scala
import org.apache.spark.sql.{Dataset, Row}
sql("create function collect_list2 as
'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList'")
val df = range(0, 10000000).select($"id", (rand(213123L) *
100000).cast("int").as("grp"))
df.select(countDistinct($"grp")).show
def benchmark(name: String, plan: Dataset[Row], maxItr: Int = 5): Unit = {
// Do not measure planning.
plan1.queryExecution.executedPlan
// Execute the plan a number of times and average the result.
val start = System.nanoTime
var i = 0
while (i < maxItr) {
plan.rdd.foreach(row => Unit)
i += 1
}
val time = (System.nanoTime - start) / (maxItr * 1000000L)
println(s"[$name] $maxItr iterations completed in an average time of
$time ms.")
}
val plan1 = df.groupBy($"grp").agg(collect_list($"id"))
val plan2 = df.groupBy($"grp").agg(callUDF("collect_list2", $"id"))
benchmark("Spark collect_list", plan1)
...
> [Spark collect_list] 5 iterations completed in an average time of 3371 ms.
benchmark("Hive collect_list", plan2)
...
> [Hive collect_list] 5 iterations completed in an average time of 9109 ms.
```
Performance is improved by a factor 2-3.
## How was this patch tested?
Added tests to `DataFrameAggregateSuite`.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/hvanhovell/spark implode
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/12874.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #12874
----
commit 9d8aeed7b18d58258097526f85f07c37a10c28d8
Author: Herman van Hovell <[email protected]>
Date: 2016-02-01T15:15:16Z
Add native collect_set/collect_list.
commit 8247d8eb6416b7b5d4eb61fa585c595d87930d63
Author: Herman van Hovell <[email protected]>
Date: 2016-02-01T15:43:24Z
Add test for struct types.
commit 326a213dc014403aef1033e9d39206f5a873b7a6
Author: Herman van Hovell <[email protected]>
Date: 2016-02-01T18:14:10Z
Add pretty names for SQL generation.
commit 9d3205d211405a598a4f24b2b5fef62cf0538f6e
Author: Herman van Hovell <[email protected]>
Date: 2016-05-03T16:32:37Z
Merge remote-tracking branch 'apache-github/master' into implode
# Conflicts:
#
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
#
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
commit da6a13151e01d1789a18090a734ad512c56aecc5
Author: Herman van Hovell <[email protected]>
Date: 2016-05-03T16:34:01Z
Merge remote-tracking branch 'apache-github/master' into implode
commit 8a4e7827d6b1f4c150ec29c35185e63c974762dd
Author: Herman van Hovell <[email protected]>
Date: 2016-05-03T18:32:22Z
Merge remote-tracking branch 'apache-github/master' into implode
# Conflicts:
#
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
#
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
commit 597f76bb350126bb3360b692720426a6d41c3c18
Author: Herman van Hovell <[email protected]>
Date: 2016-05-03T18:48:39Z
Remove hardcoded Hive references.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]