我写了一个基于时间和数量的trigger,我本来希望实现的是,比如: 1.在5秒内,如果数量达到了5000条,则提前触发, 2.如果到达5秒后,则不管有多少条都触发. 现在的情况是同一个时间点会触发多个基于时间的,我记得注册定时器,连续注册定时器,只有一个定时器才会触发,但是实际好像不是这样? 请大佬们帮我看下一下的代码有什么问题吗? import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.Window;
/** * Description:基于时间和数量的触发器 * * @Author:dinghaohao * @Create:2024-09-09-10:30 */ @Slf4j public class CountWithTimeoutTrigger<T, W extends Window> extends Trigger<T, W> { private static final long serialVersionUID = 1L; private final long maxCount; private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE); private CountWithTimeoutTrigger(long maxCount) { this.maxCount = maxCount; } public static <T, W extends Window> CountWithTimeoutTrigger<T, W> of(long maxCount) { return new CountWithTimeoutTrigger<>(maxCount); } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { return TriggerResult.CONTINUE; } private void registerNextTimer(TriggerContext ctx, W window) { // 在注册新定时器之前,先清除可能存在的旧定时器 ctx.deleteProcessingTimeTimer(window.maxTimestamp()); ctx.registerProcessingTimeTimer(window.maxTimestamp()); } @Override public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception { ReducingState<Long> count = ctx.getPartitionedState(stateDesc); count.add(1L); // 只在第一个元素到达时注册定时器 if (count.get() == 1L) { registerNextTimer(ctx, window); } if (count.get() >= maxCount) { log.info("base on count trigger,count:{}", count.get()); count.clear(); // 清除旧的定时器 ctx.deleteProcessingTimeTimer(window.maxTimestamp()); // 注册新的定时器 registerNextTimer(ctx, window); return TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { ReducingState<Long> count = ctx.getPartitionedState(stateDesc); if (count.get() == null || count.get() == 0L) { return TriggerResult.CONTINUE; } log.info("base on timeout trigger,count:{},time:{}", count.get(), window.maxTimestamp()); count.clear(); return TriggerResult.FIRE_AND_PURGE; } @Override public void clear(W window, TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(window.maxTimestamp()); ctx.getPartitionedState(stateDesc).clear(); } @Override public boolean canMerge() { return true; } @Override public void onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); // only register a timer if the time is not yet past the end of the merged // window // this is in line with the logic in onElement(). If the time is past the end of // the window onElement() will fire and setting a timer here would fire the // window twice. long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } @Override public String toString() { return "CountTrigger(" + maxCount + ")"; } private static class Sum implements ReduceFunction<Long> { private static final long serialVersionUID = 1L; @Override public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } } }