你好,整个程序有反压吗
















在 2023-07-10 15:32:44,"jiaot...@mail.jj.cn" <jiaot...@mail.jj.cn> 写道:
>Hello,
>         我定义了一个pattern (a->b->c->d->e->f->g)在10分钟内匹配,通过在WebUI上查看任务很快在cep节点 
> busy(max)100%,我发现通过增加cep节点的并发度并不能解决问题,且checkpoint随着时间的推移状态大小越来越大,数据应该存在大量堆积。数据源同时消费4个kafka
>  topic 
> (setTopics),采用默认水位线间隔时间,我发现4个topic的数据流量存在比较大的差异,因此我增加了水位线触发间隔时间,同时水位线时间戳来自于四个topic中最小的数值。但是问题依然没有解决,cep节点在几个小时后依然busy。
>@Override
>public void onEvent(LobbyPathData lobbyPathData, long l, WatermarkOutput 
>watermarkOutput) {
>    String key = lobbyPathData.getProject() + lobbyPathData.getEvent_name();
>    if (!maxTimePerTopic.containsKey(key) || l > maxTimePerTopic.get(key)) {
>        maxTimePerTopic.put(key, l);
>    }
>}
>
>@Override
>public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>    Optional<Long> min = 
> maxTimePerTopic.values().stream().min(Comparator.comparingLong(Long::valueOf));
>    min.ifPresent(t -> watermarkOutput.emitWatermark(new Watermark(t - 
> outOfOrdernessMillis - 1)));
>}

回复