Hi Dan,

I'm quite confused as you already use per-partition watermarking.

What I meant in the reply is
- If you don't use per-partition watermarking, # tasks < # partitions can
cause the problem for backfill jobs.
- If you don't use per-partition watermarking, # tasks = # partitions is
going to be okay even for backfill jobs.
- If you use per-partition watermarking, # tasks < # partitions shouldn't
cause any problems unless you turn on the idleness detection.

Regarding the idleness detection which is based on processing time, what is
your setting? If you set the value to 10 seconds for example, you'll face
the same problem unless the watermark of your backfill job catches
up real-time within 10 seconds. If you increase the value to 1 minute, your
backfill job should catch up real-time within 1 minute.

Best,

Dongwon


On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <quietgol...@gmail.com> wrote:

> Thanks Dongwon!
>
> Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
> tasks < # kafka partitions.  This should be called out in the docs or the
> bug should be fixed.
>
> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <eastcirc...@gmail.com>
> wrote:
>
>> Hi Dan,
>>
>> Do you use the per-partition watermarking explained in [1]?
>> I've also experienced a similar problem when running backfill jobs
>> specifically when # source tasks < # kafka partitions.
>> - When # source tasks = # kafka partitions, the backfill job works as
>> expected.
>> - When # source tasks < # kafka partitions, a Kafka consumer consumes
>> multiple partitions. This case can destroying the per-partition patterns as
>> explained in [2].
>>
>> Hope this helps.
>>
>> p.s. If you plan to use the per-partition watermarking, be aware that
>> idleness detection [3] can cause another problem when you run a backfill
>> job. Kafka source tasks in a backfill job seem to read a batch of records
>> from Kafka and then wait for downstream tasks to catch up the progress,
>> which can be counted as idleness.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>> [3]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>>
>> Best,
>>
>> Dongwon
>>
>> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com> wrote:
>>
>>> I'm following the example from this section:
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>>
>>> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com> wrote:
>>>
>>>> Other points
>>>> - I'm using the kafka timestamp as event time.
>>>> - The same issue happens even if I use an idle watermark.
>>>>
>>>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com>
>>>> wrote:
>>>>
>>>>> There are 12 Kafka partitions (to keep the structure similar to other
>>>>> low traffic environments).
>>>>>
>>>>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi.
>>>>>>
>>>>>> I'm running a backfill from a kafka topic with very few records
>>>>>> spread across a few days.  I'm seeing a case where the records coming 
>>>>>> from
>>>>>> a kafka source have a watermark that's more recent (by hours) than the
>>>>>> event time.  I haven't seen this before when running this.  This violates
>>>>>> what I'd assume the kafka source would do.
>>>>>>
>>>>>> Example problem:
>>>>>> 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual
>>>>>> times are separated by a longer time period.
>>>>>> 2.  My first operator after the FlinkKafkaConsumer sees:
>>>>>>    context.timestamp() = 1000
>>>>>>    context.timerService().currentWatermark() = 500000
>>>>>>
>>>>>> Details about how I'm running this:
>>>>>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the
>>>>>> source.
>>>>>> - I'm using FlinkKafkaConsumer
>>>>>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No
>>>>>> idleness settings.
>>>>>> - I'm running similar code in all the environments.  The main
>>>>>> difference is low traffic.  I have not been able to reproduce this out of
>>>>>> the environment.
>>>>>>
>>>>>>
>>>>>> I put the following process function right after my kafka source.
>>>>>>
>>>>>> --------
>>>>>>
>>>>>> AfterSource
>>>>>> ts=1647274892728
>>>>>> watermark=1647575140007
>>>>>> record=...
>>>>>>
>>>>>>
>>>>>> public static class TextLog extends ProcessFunction<Record, Record> {
>>>>>>     private final String label;
>>>>>>     public TextLogDeliveryLog(String label) {
>>>>>>         this.label = label;
>>>>>>     }
>>>>>>     @Override
>>>>>>     public void processElement(Record record, Context context,
>>>>>> Collector<Record> collector) throws Exception {
>>>>>>         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>>>>>>                 label, context.timestamp(),
>>>>>> context.timerService().currentWatermark(), record);
>>>>>>         collector.collect(deliveryLog);
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>

Reply via email to