Hi Roman,

We're using a custom watermarker that uses a histogram to calculate a "best
fit" event time as the data we receive can be very unordered.

As you can see we're using the timestamp from the first event in the batch,
so we're essentially sampling the timestamps rather than using them all.

FlinkKinesisConsumer<Batch&lt;EventType>> consumer = new
FlinkKinesisConsumer<>(...);

consumer.setPeriodicWatermarkAssigner(
    new HistogramWatermarker<>(Time.minutes(30), 100) {
        @Override
        public long extractTimestamp(final Batch<EventType> element) {
            return element.getBatch().get(0).getDate().getTime();
        }
    }
);

Cheers,
Randal.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to