Hi Roman, We're using a custom watermarker that uses a histogram to calculate a "best fit" event time as the data we receive can be very unordered.
As you can see we're using the timestamp from the first event in the batch, so we're essentially sampling the timestamps rather than using them all. FlinkKinesisConsumer<Batch<EventType>> consumer = new FlinkKinesisConsumer<>(...); consumer.setPeriodicWatermarkAssigner( new HistogramWatermarker<>(Time.minutes(30), 100) { @Override public long extractTimestamp(final Batch<EventType> element) { return element.getBatch().get(0).getDate().getTime(); } } ); Cheers, Randal. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/