Other points - I'm using the kafka timestamp as event time. - The same issue happens even if I use an idle watermark.
On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com> wrote: > There are 12 Kafka partitions (to keep the structure similar to other low > traffic environments). > > On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> wrote: > >> Hi. >> >> I'm running a backfill from a kafka topic with very few records spread >> across a few days. I'm seeing a case where the records coming from a kafka >> source have a watermark that's more recent (by hours) than the event time. >> I haven't seen this before when running this. This violates what I'd >> assume the kafka source would do. >> >> Example problem: >> 1. I have kafka records at ts=1000, 2000, ... 500000. The actual times >> are separated by a longer time period. >> 2. My first operator after the FlinkKafkaConsumer sees: >> context.timestamp() = 1000 >> context.timerService().currentWatermark() = 500000 >> >> Details about how I'm running this: >> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source. >> - I'm using FlinkKafkaConsumer >> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s). No idleness >> settings. >> - I'm running similar code in all the environments. The main difference >> is low traffic. I have not been able to reproduce this out of the >> environment. >> >> >> I put the following process function right after my kafka source. >> >> -------- >> >> AfterSource >> ts=1647274892728 >> watermark=1647575140007 >> record=... >> >> >> public static class TextLog extends ProcessFunction<Record, Record> { >> private final String label; >> public TextLogDeliveryLog(String label) { >> this.label = label; >> } >> @Override >> public void processElement(Record record, Context context, >> Collector<Record> collector) throws Exception { >> LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}", >> label, context.timestamp(), >> context.timerService().currentWatermark(), record); >> collector.collect(deliveryLog); >> } >> } >> >