你好,如果是这样的需求:“按一天统计某一个key上有多少条数据,统计结果每五分钟输出更新一次”的话,
我认为可以这样:
 
在一个一天的windows中做Tupel2数据的reduce,然后在下游接一个五分钟的ProcessTimeWindow,在这个五分钟的windwos中做evictor(CountEvictor.of(1)),然后输出。
 比如这样:
streamOperator
  .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<EventItem>() {
  @Override
  public long extractAscendingTimestamp(EventItem eventItem) {
   return eventItem.getWindowEnd();
  }
  })
  .map(eventItem -> Tuple2.of(eventItem.getItemId(), 1L))
  .keyBy(1)
   // 东八区零点到23:59:59:999的滑动事件时间窗口
  .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16)))
  // 在window中key上的消息条数
  .reduce((x1,x2)->new Tuple2<>(x2._1(),x1._1()+x2._2()))

  // 在5分钟的ProcessTime滑动窗口里,只取最后一条输出
  .keyBy(1)
  .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
  .evictor(CountEvictor.of(1))
  .reduce((ReduceFunction) (value1, value2) -> value2)

  .addSink(textLongSink);



这是我在使用过程中实时刷新每天统计数据的方法。

回复