以下是我写了的一段代码,在 processElement 中每次进来一条日志就注册一次 onTimer事件 
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1L);
如果onTimer触发则说明窗口内所有日志都被处理然后我要对窗口内的所有数据进行ctr计算但是onTimer不触发,我使用 
KeyedProcessFunction 就可以触发,这是怎么回事呢?

public class TopCTRProcessFunction extends KeyedBroadcastProcessFunction<Long, 
ItemViewCount, Tuple2<String, TopCTRInfo>, String> {
    private ListState<ItemViewCount> itemState;
    private MapStateDescriptor<String, TopCTRInfo> ruleMapStateDescr =
            new MapStateDescriptor<>("ruleMapState", String.class, 
TopCTRInfo.class);

    @Override
    public void open(Configuration parameters) throws Exception {
        ListStateDescriptor<ItemViewCount> itemsStateDesc = new 
ListStateDescriptor<>(
                "itemState-state",
                ItemViewCount.class);
        itemState = getRuntimeContext().getListState(itemsStateDesc);
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (itemState != null) {
            itemState.clear();
            itemState = null;
        }
    }

    @Override
    public void processElement(ItemViewCount value, ReadOnlyContext ctx, 
Collector<String> out) throws Exception {
        //计算ctr
        value.setCtr(value.getClkCount() == 0 ? 0 : ((double) 
value.getClkCount() / value.getImpCount() * 100));
        itemState.add(value);
        // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
        ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1L);
    }

    @Override
    public void processBroadcastElement(Tuple2<String, TopCTRInfo> value, 
Context ctx, Collector<String> out) throws Exception {
        ctx.getBroadcastState(ruleMapStateDescr).put(value.f0, value.f1);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> 
out) throws Exception {
        ReadOnlyBroadcastState<String, TopCTRInfo> topCTRMap = 
ctx.getBroadcastState(ruleMapStateDescr);
        TopCTRInfo topCTRInfo = 
topCTRMap.get(Constant.REDIS_BROADCAST_TOPCTR_KEY);
        if (topCTRInfo == null) {
            topCTRInfo = new TopCTRInfo();
        }
        List<ItemViewCount> allItems = new ArrayList<>();
        List<ItemViewCount> jsonItems = new ArrayList<>();
        for (ItemViewCount item : itemState.get()) {
            if (item.getCtr() >= topCTRInfo.getCtrAlertThreshold()) {
                allItems.add(item);
            }
        }
        itemState.clear();
        allItems.sort(new Comparator<ItemViewCount>() {
            @Override
            public int compare(ItemViewCount o1, ItemViewCount o2) {
                return (int) ((o2.getCtr() - o1.getCtr()) * 100);
            }
        });
        int totalSize = topCTRInfo.getTopSize() > allItems.size() ? 
allItems.size() : topCTRInfo.getTopSize();
        for (int i = 0; i < totalSize; i++) {
            jsonItems.add(allItems.get(i));
        }
        out.collect(JSON.toJSONString(jsonItems));
    }
}
以下是可以触发onTimer的代码 使用 KeyedProcessFunction 
.process(new KeyedProcessFunction<Long, ItemViewCount, String>() {
    private final int topSize = 10;
    private ListState<ItemViewCount> itemState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ListStateDescriptor<ItemViewCount> itemsStateDesc = new 
ListStateDescriptor<>(
                "itemState-state",
                ItemViewCount.class);
        itemState = getRuntimeContext().getListState(itemsStateDesc);
    }

    @Override
    public void processElement(ItemViewCount value, Context ctx, 
Collector<String> out) throws Exception {
        //计算ctr
        value.setCtr(value.getClkCount() == 0 ? 0 : ((double) 
value.getClkCount() / value.getImpCount() * 100));
        itemState.add(value);
        // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
        ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> 
out) throws Exception {
        super.onTimer(timestamp, ctx, out);
        List<ItemViewCount> allItems = new ArrayList<>();
        List<ItemViewCount> jsonItems = new ArrayList<>();
        for (ItemViewCount item : itemState.get()) {
            allItems.add(item);
        }
        itemState.clear();
        allItems.sort(new Comparator<ItemViewCount>() {
            @Override
            public int compare(ItemViewCount o1, ItemViewCount o2) {
                return (int) ((o2.getCtr() - o1.getCtr()) * 100);
            }
        });
        int totalSize = topSize > allItems.size() ? allItems.size() : topSize;
        for (int i = 0; i < totalSize; i++) {
            jsonItems.add(allItems.get(i));
        }
        out.collect(JSON.toJSONString(jsonItems));
    }

})


[email protected]

回复