自定义的trigger,实现满足maxcount或者到达窗口结束时间时输出结果;
问题:同一个窗口,在代码窗口结束时onProcessingTime会触发多次,理论上每个爽口只应该在到达窗口结束时间触发一次,是什么原因
主类代码片段:
SingleOutputStreamOperator<List<String>> windowMap =

                afterMap.timeWindowAll(Time.seconds(5))
                        .trigger(new CountAndProcessingTimeTrigger(
                                100))
                        .process(simpleConfig.getWindowFunction().newInstance())
触发器代码:


public class CountAndProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;
    //窗口最大个数
    private final long maxCount;
    private final ReducingStateDescriptor<Long> stateDesc;
    public CountAndProcessingTimeTrigger(long maxCount) {
        this.stateDesc = new ReducingStateDescriptor<>("count_time", new 
CountAndProcessingTimeTrigger.Sum(),
                LongSerializer.INSTANCE);
        this.maxCount = maxCount;
    }
    /**
     * 元素添加
     *
     * @param o 元素
     * @param timestamp timestamp
     * @param window window
     * @param triggerContext triggerContext
     * @return TriggerResult
     * CONTINUE:表示啥都不做。
     * FIRE:表示触发计算,同时保留窗口中的数据
     * PURGE:简单地删除窗口的内容,并保留关于窗口和任何触发器状态的任何潜在元信息。
     * FIRE_AND_PURGE:触发计算,然后清除窗口中的元素。
     * @throws Exception Exception
     */
    @Override
    public TriggerResult onElement(Object o, long timestamp, TimeWindow window, 
TriggerContext triggerContext)
            throws Exception {
        triggerContext.registerProcessingTimeTimer(window.maxTimestamp());
        ReducingState<Long> countState = 
triggerContext.getPartitionedState(stateDesc);
        countState.add(1L);
        if (countState.get() >= maxCount) {
            log.info("countTrigger: {}", countState.get());
            countState.clear();
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }
    /**
     * 窗口关闭
     *
     * @param timestamp timestamp
     * @param window window
     * @param triggerContext triggerContext
     * @return TriggerResult
     * @throws Exception Exception
     */
    @Override
    public TriggerResult onProcessingTime(long timestamp, TimeWindow window, 
TriggerContext triggerContext)
            throws Exception {
        ReducingState<Long> countState = 
triggerContext.getPartitionedState(stateDesc);
        log.info("timeTrigger: {}, currentProcessingTime:{}", countState.get(),
                window.maxTimestamp());
        countState.clear();
        return TriggerResult.FIRE_AND_PURGE;
    }
    @Override
    public TriggerResult onEventTime(long timestamp, TimeWindow window, 
TriggerContext triggerContext)
            throws Exception {
        return TriggerResult.CONTINUE;
    }
    @Override
    public boolean canMerge() {
        return false;
    }
    @Override
    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        ctx.mergePartitionedState(stateDesc);
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }
    }
    /**
     * 窗口删除
     *
     * @param window window
     * @param triggerContext triggerContext
     * @throws Exception Exception
     */
    @Override
    public void clear(TimeWindow window, TriggerContext triggerContext) throws 
Exception {
        triggerContext.deleteProcessingTimeTimer(window.maxTimestamp());
        triggerContext.getPartitionedState(stateDesc).clear();
    }
    /**
     * 计数方法
     */
    private static class Sum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;
        private Sum() {
        }
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    }
}


| |
吴先生
|
|
15951914...@163.com
|

回复