自定义的trigger,实现满足maxcount或者到达窗口结束时间时输出结果; 问题:同一个窗口,在代码窗口结束时onProcessingTime会触发多次,理论上每个爽口只应该在到达窗口结束时间触发一次,是什么原因 主类代码片段: SingleOutputStreamOperator<List<String>> windowMap =
afterMap.timeWindowAll(Time.seconds(5)) .trigger(new CountAndProcessingTimeTrigger( 100)) .process(simpleConfig.getWindowFunction().newInstance()) 触发器代码: public class CountAndProcessingTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; //窗口最大个数 private final long maxCount; private final ReducingStateDescriptor<Long> stateDesc; public CountAndProcessingTimeTrigger(long maxCount) { this.stateDesc = new ReducingStateDescriptor<>("count_time", new CountAndProcessingTimeTrigger.Sum(), LongSerializer.INSTANCE); this.maxCount = maxCount; } /** * 元素添加 * * @param o 元素 * @param timestamp timestamp * @param window window * @param triggerContext triggerContext * @return TriggerResult * CONTINUE:表示啥都不做。 * FIRE:表示触发计算,同时保留窗口中的数据 * PURGE:简单地删除窗口的内容,并保留关于窗口和任何触发器状态的任何潜在元信息。 * FIRE_AND_PURGE:触发计算,然后清除窗口中的元素。 * @throws Exception Exception */ @Override public TriggerResult onElement(Object o, long timestamp, TimeWindow window, TriggerContext triggerContext) throws Exception { triggerContext.registerProcessingTimeTimer(window.maxTimestamp()); ReducingState<Long> countState = triggerContext.getPartitionedState(stateDesc); countState.add(1L); if (countState.get() >= maxCount) { log.info("countTrigger: {}", countState.get()); countState.clear(); return TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; } /** * 窗口关闭 * * @param timestamp timestamp * @param window window * @param triggerContext triggerContext * @return TriggerResult * @throws Exception Exception */ @Override public TriggerResult onProcessingTime(long timestamp, TimeWindow window, TriggerContext triggerContext) throws Exception { ReducingState<Long> countState = triggerContext.getPartitionedState(stateDesc); log.info("timeTrigger: {}, currentProcessingTime:{}", countState.get(), window.maxTimestamp()); countState.clear(); return TriggerResult.FIRE_AND_PURGE; } @Override public TriggerResult onEventTime(long timestamp, TimeWindow window, TriggerContext triggerContext) throws Exception { return TriggerResult.CONTINUE; } @Override public boolean canMerge() { return false; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { ctx.mergePartitionedState(stateDesc); long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } /** * 窗口删除 * * @param window window * @param triggerContext triggerContext * @throws Exception Exception */ @Override public void clear(TimeWindow window, TriggerContext triggerContext) throws Exception { triggerContext.deleteProcessingTimeTimer(window.maxTimestamp()); triggerContext.getPartitionedState(stateDesc).clear(); } /** * 计数方法 */ private static class Sum implements ReduceFunction<Long> { private static final long serialVersionUID = 1L; private Sum() { } public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } } } | | 吴先生 | | 15951914...@163.com |