GitHub user liancheng opened a pull request:
https://github.com/apache/spark/pull/15590
[SPARK-17949][SQL] A Java object based aggregate operator
## What changes were proposed in this pull request?
This PR adds a new hash-based aggregate operator named
`ObjectHashAggregateExec` that supports `TypedImperativeAggregate`, which may
use arbitrary Java objects as aggregation states. Please refer to the [design
doc][1] attached in [SPARK-17949][2] for more details about it.
[1]:
https://issues.apache.org/jira/secure/attachment/12834260/%5BDesign%20Doc%5D%20Support%20for%20Arbitrary%20Aggregation%20States.pdf
[2]: https://issues.apache.org/jira/browse/SPARK-17949
Major benefits of this operator is better performance when evaluating
`TypedImperativeAggregate` functions, especially when there are relatively few
distinct groups. Functions like Hive UDAFs, `collect_list`, and `collect_set`
may also benefit from this after being migrated to `TypedImperativeAggregate`.
The following feature flag is introduced to enable or disable the new
aggregate operator:
- Name: `spark.sql.execution.useObjectHashAggregateExec`
- Default value: `true`
We can also configure the fallback threshold using the following SQL
operation:
- Name: `spark.sql.objectHashAggregate.sortBased.fallbackThreshold`
- Default value: 128
Fallback to sort-based aggregation when more then 128 distinct groups are
accumulated in the aggregation hash map. This number is intentionally made
small to avoid GC problems since aggregation buffers of this operator may
contain arbitrary Java objects.
This may be improved by implementing size tracking for this operator, but
that can be done in a separate PR.
Code generation and size tracking are planned to be implemented in
follow-up PRs.
## Benchmark results
### `ObjectHashAggregateExec` vs `SortAggregateExec`
The first benchmark compares `ObjectHashAggregateExec` and
`SortAggregateExec` by evaluating `typed_count`, a testing
`TypedImperativeAggregate` version of the SQL `count` function.
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
object agg v.s. sort agg: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
------------------------------------------------------------------------------------------------
sort agg w/ group by 31251 / 31908 3.4
298.0 1.0X
object agg w/ group by w/o fallback 6903 / 7141 15.2
65.8 4.5X
object agg w/ group by w/ fallback 20945 / 21613 5.0
199.7 1.5X
sort agg w/o group by 4734 / 5463 22.1
45.2 6.6X
object agg w/o group by w/o fallback 4310 / 4529 24.3
41.1 7.3X
```
The next benchmark compares `ObjectHashAggregateExec` and
`SortAggregateExec` by evaluating the Spark native version of
`percentile_approx`.
Note that `percentile_approx` is so heavy an aggregate function that the
bottleneck of the benchmark is evaluating the aggregate function itself rather
than the aggregate operator. That's why the results are so close and looks
counter-intuitive (aggregation without group by is even faster than that
aggregation with group by).
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
object agg v.s. sort agg: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
------------------------------------------------------------------------------------------------
sort agg w/ group by 3418 / 3530 0.6
1630.0 1.0X
object agg w/ group by w/o fallback 3210 / 3314 0.7
1530.7 1.1X
object agg w/ group by w/ fallback 3419 / 3511 0.6
1630.1 1.0X
sort agg w/o group by 4336 / 4499 0.5
2067.3 0.8X
object agg w/o group by w/o fallback 4271 / 4372 0.5
2036.7 0.8X
```
### Hive UDAF vs Spark AF
This benchmark compares the following two kinds of aggregate functions:
- "hive udaf": Hive implementation of `percentile_approx`, without partial
aggregation supports, evaluated using `SortAggregateExec`.
- "spark af": Spark native implementation of `percentile_approx`, with
partial aggregation support, evaluated using `ObjectHashAggregateExec`
The performance differences is mostly due to faster implementation and
partial aggregation support in the Spark native version of `percentile_approx`.
This benchmark basically shows the performance differences between the
worst case, where an aggregate function without partial aggregation support is
evaluated using `SortAggregateExec`, and the best case, where a
`TypedImperativeAggregate` with partial aggregation support is evaluated using
`ObjectHashAggregateExec`.
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
hive udaf vs spark af: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
------------------------------------------------------------------------------------------------
hive udaf w/o group by 5326 / 5408 0.0
81264.2 1.0X
spark af w/o group by 93 / 111 0.7
1415.6 57.4X
hive udaf w/ group by 3804 / 3946 0.0
58050.1 1.4X
spark af w/ group by w/o fallback 71 / 90 0.9
1085.7 74.8X
spark af w/ group by w/ fallback 98 / 111 0.7
1501.6 54.1X
```
## How was this patch tested?
New unit tests and randomized test cases are added in
`ObjectAggregateFunctionSuite`.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/liancheng/spark obj-hash-agg
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/15590.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #15590
----
commit c8ec18b37410cfeaad226f57e66b88b3d465f09a
Author: Cheng Lian <[email protected]>
Date: 2016-08-04T07:54:32Z
Initial commit for the new ObjectHashAggregateExec operator
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]