是要在.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16)))
后面加一个自定义Trigger,对每一个元素触发。我自定义的Trigger如下:
public class WindowTrigger extends Trigger
@Override
public TriggerResult onElement(final T element, final long timestamp, final
TimeWindow window, final TriggerContext ctx) {
return TriggerResult.FIRE;
}
@Override
public TriggerResult onProcessingTime(final long time, final TimeWindow window,
final TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(final long time, final TimeWindow window,
final TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public void clear(final TimeWindow window, final TriggerContext ctx) {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
}
然后可以这样用:
… ...
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16)))
.tirgger(new WindowTrigger<>())
… ...
*********************
在 2019年3月6日 +0800 11:52,王涛@深瞳云 <[email protected]>,写道:
> 你好,如果是这样的需求:“按一天统计某一个key上有多少条数据,统计结果每五分钟输出更新一次”的话,
> 我认为可以这样:
>
> 在一个一天的windows中做Tupel2数据的reduce,然后在下游接一个五分钟的ProcessTimeWindow,在这个五分钟的windwos中做evictor(CountEvictor.of(1)),然后输出。
> 比如这样:
> streamOperator
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<EventItem>()
> {
> @Override
> public long extractAscendingTimestamp(EventItem eventItem) {
> return eventItem.getWindowEnd();
> }
> })
> .map(eventItem -> Tuple2.of(eventItem.getItemId(), 1L))
> .keyBy(1)
> // 东八区零点到23:59:59:999的滑动事件时间窗口
> .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16)))
> // 在window中key上的消息条数
> .reduce((x1,x2)->new Tuple2<>(x2._1(),x1._1()+x2._2()))
>
> // 在5分钟的ProcessTime滑动窗口里,只取最后一条输出
> .keyBy(1)
> .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
> .evictor(CountEvictor.of(1))
> .reduce((ReduceFunction) (value1, value2) -> value2)
>
> .addSink(textLongSink);
>
>
>
> 这是我在使用过程中实时刷新每天统计数据的方法。