Hi Jark:
Thanks for your replay!
1. 是基于哪个版本,哪个 planner 进行的测试?
Flink 1.9.0 Blink Planner
2. 流计算模式还是批计算模式?
流计算模式
3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗?
注册的名字为 red_sum
Best forideal
在 2020-04-28 11:13:50,"Jark Wu" <[email protected]> 写道:
>Hi,
>
>看了你的 UDAF 的实现,理论上是不可能比内置 count/sum 快的。可能是哪里有 bug 或是测试方式不对。
>我想先问几个问题:
>1. 是基于哪个版本,哪个 planner 进行的测试?
>2. 流计算模式还是批计算模式?
>3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗?
>
>Best,
>Jark
>
>On Tue, 28 Apr 2020 at 10:46, forideal <[email protected]> wrote:
>
>> 大家好:
>>
>>
>> 我最近在使用 Flink SQL 做一些性能测试,我发现 Flink Buildin 的 Aggr 性能都很慢,比如 COUNT,LISTAGG
>> 等等。
>> 我采用自己写的 count 性能是 buildin 的 COUNT 的函数的两倍都不止。(各种窗口都测试过,不知道是不是我使用错误)
>>
>>
>> SQL:
>>
>>
>> select
>> query_nor,
>> sum(cast (1asbigint))as query_nor_counter
>> from ods_search_track
>> groupby
>> query_nor,
>> HOP(
>> event_time,interval'30'SECOND,interval'30'MINUTE)
>> sum:
>> public class Sum extends AggregateFunction<Long, AtomicLong> {
>>
>> @Override
>> public boolean isDeterministic() {
>> return false;
>> }
>>
>> @Override
>> public AtomicLong createAccumulator() {
>> return new AtomicLong();
>> }
>>
>> @Override
>> public void open(FunctionContext context) throws Exception {
>>
>> }
>>
>> @Override
>> public Long getValue(AtomicLong acc) {
>> return acc.get();
>> }
>>
>> @Override
>> public TypeInformation getResultType() {
>> return Types.LONG;
>> }
>>
>> public void merge(AtomicLong acc, Iterable<AtomicLong> it) {
>> Iterator<AtomicLong> iter = it.iterator();
>> while (iter.hasNext()) {
>> AtomicLong a = iter.next();
>> acc.addAndGet(a.get());
>> }
>> }
>>
>> public void accumulate(AtomicLong datas, Long data) {
>> datas.addAndGet(data);
>> }
>> }
>>
>>
>> 使用 Flink buildin COUNT
>>
>>
>> select
>> query_nor,
>> count(1) as query_nor_counter
>> from ods_search_track
>> groupby
>> query_nor,
>> HOP(
>> event_time,interval'30'SECOND,interval'30'MINUTE)