你好,整个程序有反压吗
在 2023-07-10 15:32:44,"jiaot...@mail.jj.cn" <jiaot...@mail.jj.cn> 写道: >Hello, > 我定义了一个pattern (a->b->c->d->e->f->g)在10分钟内匹配,通过在WebUI上查看任务很快在cep节点 > busy(max)100%,我发现通过增加cep节点的并发度并不能解决问题,且checkpoint随着时间的推移状态大小越来越大,数据应该存在大量堆积。数据源同时消费4个kafka > topic > (setTopics),采用默认水位线间隔时间,我发现4个topic的数据流量存在比较大的差异,因此我增加了水位线触发间隔时间,同时水位线时间戳来自于四个topic中最小的数值。但是问题依然没有解决,cep节点在几个小时后依然busy。 >@Override >public void onEvent(LobbyPathData lobbyPathData, long l, WatermarkOutput >watermarkOutput) { > String key = lobbyPathData.getProject() + lobbyPathData.getEvent_name(); > if (!maxTimePerTopic.containsKey(key) || l > maxTimePerTopic.get(key)) { > maxTimePerTopic.put(key, l); > } >} > >@Override >public void onPeriodicEmit(WatermarkOutput watermarkOutput) { > Optional<Long> min = > maxTimePerTopic.values().stream().min(Comparator.comparingLong(Long::valueOf)); > min.ifPresent(t -> watermarkOutput.emitWatermark(new Watermark(t - > outOfOrdernessMillis - 1))); >}