Hi Dan, Do you use the per-partition watermarking explained in [1]? I've also experienced a similar problem when running backfill jobs specifically when # source tasks < # kafka partitions. - When # source tasks = # kafka partitions, the backfill job works as expected. - When # source tasks < # kafka partitions, a Kafka consumer consumes multiple partitions. This case can destroying the per-partition patterns as explained in [2].
Hope this helps. p.s. If you plan to use the per-partition watermarking, be aware that idleness detection [3] can cause another problem when you run a backfill job. Kafka source tasks in a backfill job seem to read a batch of records from Kafka and then wait for downstream tasks to catch up the progress, which can be counted as idleness. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie [2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector [3] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources Best, Dongwon On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com> wrote: > I'm following the example from this section: > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector > > On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com> wrote: > >> Other points >> - I'm using the kafka timestamp as event time. >> - The same issue happens even if I use an idle watermark. >> >> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com> wrote: >> >>> There are 12 Kafka partitions (to keep the structure similar to other >>> low traffic environments). >>> >>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> wrote: >>> >>>> Hi. >>>> >>>> I'm running a backfill from a kafka topic with very few records spread >>>> across a few days. I'm seeing a case where the records coming from a kafka >>>> source have a watermark that's more recent (by hours) than the event time. >>>> I haven't seen this before when running this. This violates what I'd >>>> assume the kafka source would do. >>>> >>>> Example problem: >>>> 1. I have kafka records at ts=1000, 2000, ... 500000. The actual times >>>> are separated by a longer time period. >>>> 2. My first operator after the FlinkKafkaConsumer sees: >>>> context.timestamp() = 1000 >>>> context.timerService().currentWatermark() = 500000 >>>> >>>> Details about how I'm running this: >>>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source. >>>> - I'm using FlinkKafkaConsumer >>>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s). No >>>> idleness settings. >>>> - I'm running similar code in all the environments. The main >>>> difference is low traffic. I have not been able to reproduce this out of >>>> the environment. >>>> >>>> >>>> I put the following process function right after my kafka source. >>>> >>>> -------- >>>> >>>> AfterSource >>>> ts=1647274892728 >>>> watermark=1647575140007 >>>> record=... >>>> >>>> >>>> public static class TextLog extends ProcessFunction<Record, Record> { >>>> private final String label; >>>> public TextLogDeliveryLog(String label) { >>>> this.label = label; >>>> } >>>> @Override >>>> public void processElement(Record record, Context context, >>>> Collector<Record> collector) throws Exception { >>>> LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}", >>>> label, context.timestamp(), >>>> context.timerService().currentWatermark(), record); >>>> collector.collect(deliveryLog); >>>> } >>>> } >>>> >>>