你好,如果是这样的需求:“按一天统计某一个key上有多少条数据,统计结果每五分钟输出更新一次”的话,
我认为可以这样:
在一个一天的windows中做Tupel2数据的reduce,然后在下游接一个五分钟的ProcessTimeWindow,在这个五分钟的windwos中做evictor(CountEvictor.of(1)),然后输出。
比如这样:
streamOperator
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<EventItem>() {
@Override
public long extractAscendingTimestamp(EventItem eventItem) {
return eventItem.getWindowEnd();
}
})
.map(eventItem -> Tuple2.of(eventItem.getItemId(), 1L))
.keyBy(1)
// 东八区零点到23:59:59:999的滑动事件时间窗口
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16)))
// 在window中key上的消息条数
.reduce((x1,x2)->new Tuple2<>(x2._1(),x1._1()+x2._2()))
// 在5分钟的ProcessTime滑动窗口里,只取最后一条输出
.keyBy(1)
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.evictor(CountEvictor.of(1))
.reduce((ReduceFunction) (value1, value2) -> value2)
.addSink(textLongSink);
这是我在使用过程中实时刷新每天统计数据的方法。