Hi all,

We are writing an application that set TimeCharacteristic.EventTime as time
characteristic. When we implement the ProcessWindowFunction for a
TumblingWindow, we added code as below to check if the timestamp of events
is in the tumbling window time range. To our surprise, we found that the
process function reports processing events that are not in the tumbling
window time range. Any insights on how this happens?  We are using Flink
1.9.1.

Below is the timestamp assigner, stream dag snippet and process function
implementation:

Timestamp assigner:

FlinkKafkaConsumerBase<Event> source =
consumer.assignTimestampsAndWatermarks(

   new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(60)) {

         @Override

         public long extractTimestamp(Event element) {

           return element.getTimestamp();

         }

       });


The stream dag of our application:

env.addSource(source)

   .filter(new EventFilter(events))

    .keyBy(new KeySelector<Event, EventStreamPartitionKey>() {

        @Override

        public EventStreamPartitionKey  getKey(Event value)

           throws Exception {

        return new EventStreamPartitionKey(value.getHost());

       }

    }).window(TumblingEventTimeWindows.of(Time.seconds(60))

   .process(new EventValidator())

   .addSink(kafkaProducer);

The implementation of  process window function EventValidator.process that
checks whether the event timestamp is in the tumbling window time range:

@Override
public void process(EventStreamPartitionKey key,
                  Context context, Iterable<Event> elements,
Collector<EventResult> out) {

for(Event event : elements) {
    if ( event.getTimestamp() >= context.window().getEnd() ||
   event.getTimestamp() < context.window().getStart() )

    System.out.println("NOT in RANGE: " + context.window().getStart()

        + ", " + event.getTimestamp() + ", " + context.window().getEnd());
...

}
out.collect(res);
}



Thanks!


Regards,

-Yu

Reply via email to