flink版本 1.11.1

实现了一个UDAF聚集函数,将窗口内某些字段合并成一个字符串。代码如下:

public class AggDistinctDetail extends AggregateFunction<String,
AggDistinctDetail.Details> {
    private static final Logger logger =
LoggerFactory.getLogger(AggDistinctDetail.class);

    public static class Details {
        public Set<String> set;
    }

    @Override
    public Details createAccumulator() {
        return new Details();
    }

    @Override
    public String getValue(Details acc) {
        return JSON.toJSONString(acc.set);
    }
    
    public void accumulate(Details acc, String val) {
        if (acc.set == null) {
            acc.set = new HashSet<>();
        }
        acc.set.add(val);
    }

    public void retract(Details acc, String val) {
        //now, agg detail don't need support retraction
    }

    public void merge(Details acc, Iterable<Details> it) {
        Iterator<Details> iter = it.iterator();
        if (acc.set == null) {
            acc.set = new HashSet<>();
        }
        while (iter.hasNext()) {
            Details a = iter.next();
            acc.set.addAll(a.set);
        }
    }

    public void resetAccumulator(Details acc) {
        acc.set = null;
    }
}

将此UDAF使用在over窗口上,此窗口按realIp分区,以消息中事件时间(EventTime)
requestDateTime向前推24小时作为窗口,统计窗口内realIp对应的所有userId,作为明细输出userId聚集后的字符串。

drop function if exists UDF_InfoDistinctMerge;
create function UDF_InfoDistinctMerge AS
'com.binance.risk.flink.udf.AggDistinctDetail';

select
realIp ,
UDF_InfoDistinctMerge(userId) over w1 as userSet
from source_table
window w1 as (partition by realIp order by requestDateTime asc RANGE BETWEEN
INTERVAL '24' hour preceding AND CURRENT ROW) ;

实际测试下来,发现聚集后的字符串userSet是一直在增长,即使窗口时间已经超过24小时,依然被聚集到userSet这个结果中,这和预期不符。

问题:
是上面UDAF的实现有啥问题么?还是UDAF在over窗口上有bug?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复