Thanks Dongwon! Wow. Yes, I'm using per-partition watermarking [1]. Yes, my # source tasks < # kafka partitions. This should be called out in the docs or the bug should be fixed.
On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > Hi Dan, > > Do you use the per-partition watermarking explained in [1]? > I've also experienced a similar problem when running backfill jobs > specifically when # source tasks < # kafka partitions. > - When # source tasks = # kafka partitions, the backfill job works as > expected. > - When # source tasks < # kafka partitions, a Kafka consumer consumes > multiple partitions. This case can destroying the per-partition patterns as > explained in [2]. > > Hope this helps. > > p.s. If you plan to use the per-partition watermarking, be aware that > idleness detection [3] can cause another problem when you run a backfill > job. Kafka source tasks in a backfill job seem to read a batch of records > from Kafka and then wait for downstream tasks to catch up the progress, > which can be counted as idleness. > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie > [2] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector > [3] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources > > Best, > > Dongwon > > On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com> wrote: > >> I'm following the example from this section: >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector >> >> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com> wrote: >> >>> Other points >>> - I'm using the kafka timestamp as event time. >>> - The same issue happens even if I use an idle watermark. >>> >>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com> wrote: >>> >>>> There are 12 Kafka partitions (to keep the structure similar to other >>>> low traffic environments). >>>> >>>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> >>>> wrote: >>>> >>>>> Hi. >>>>> >>>>> I'm running a backfill from a kafka topic with very few records spread >>>>> across a few days. I'm seeing a case where the records coming from a >>>>> kafka >>>>> source have a watermark that's more recent (by hours) than the event time. >>>>> I haven't seen this before when running this. This violates what I'd >>>>> assume the kafka source would do. >>>>> >>>>> Example problem: >>>>> 1. I have kafka records at ts=1000, 2000, ... 500000. The actual >>>>> times are separated by a longer time period. >>>>> 2. My first operator after the FlinkKafkaConsumer sees: >>>>> context.timestamp() = 1000 >>>>> context.timerService().currentWatermark() = 500000 >>>>> >>>>> Details about how I'm running this: >>>>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the >>>>> source. >>>>> - I'm using FlinkKafkaConsumer >>>>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s). No >>>>> idleness settings. >>>>> - I'm running similar code in all the environments. The main >>>>> difference is low traffic. I have not been able to reproduce this out of >>>>> the environment. >>>>> >>>>> >>>>> I put the following process function right after my kafka source. >>>>> >>>>> -------- >>>>> >>>>> AfterSource >>>>> ts=1647274892728 >>>>> watermark=1647575140007 >>>>> record=... >>>>> >>>>> >>>>> public static class TextLog extends ProcessFunction<Record, Record> { >>>>> private final String label; >>>>> public TextLogDeliveryLog(String label) { >>>>> this.label = label; >>>>> } >>>>> @Override >>>>> public void processElement(Record record, Context context, >>>>> Collector<Record> collector) throws Exception { >>>>> LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}", >>>>> label, context.timestamp(), >>>>> context.timerService().currentWatermark(), record); >>>>> collector.collect(deliveryLog); >>>>> } >>>>> } >>>>> >>>>