Thanks Dongwon!

Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
tasks < # kafka partitions.  This should be called out in the docs or the
bug should be fixed.

On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <eastcirc...@gmail.com> wrote:

> Hi Dan,
>
> Do you use the per-partition watermarking explained in [1]?
> I've also experienced a similar problem when running backfill jobs
> specifically when # source tasks < # kafka partitions.
> - When # source tasks = # kafka partitions, the backfill job works as
> expected.
> - When # source tasks < # kafka partitions, a Kafka consumer consumes
> multiple partitions. This case can destroying the per-partition patterns as
> explained in [2].
>
> Hope this helps.
>
> p.s. If you plan to use the per-partition watermarking, be aware that
> idleness detection [3] can cause another problem when you run a backfill
> job. Kafka source tasks in a backfill job seem to read a batch of records
> from Kafka and then wait for downstream tasks to catch up the progress,
> which can be counted as idleness.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>
> Best,
>
> Dongwon
>
> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com> wrote:
>
>> I'm following the example from this section:
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>
>> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com> wrote:
>>
>>> Other points
>>> - I'm using the kafka timestamp as event time.
>>> - The same issue happens even if I use an idle watermark.
>>>
>>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com> wrote:
>>>
>>>> There are 12 Kafka partitions (to keep the structure similar to other
>>>> low traffic environments).
>>>>
>>>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi.
>>>>>
>>>>> I'm running a backfill from a kafka topic with very few records spread
>>>>> across a few days.  I'm seeing a case where the records coming from a 
>>>>> kafka
>>>>> source have a watermark that's more recent (by hours) than the event time.
>>>>> I haven't seen this before when running this.  This violates what I'd
>>>>> assume the kafka source would do.
>>>>>
>>>>> Example problem:
>>>>> 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual
>>>>> times are separated by a longer time period.
>>>>> 2.  My first operator after the FlinkKafkaConsumer sees:
>>>>>    context.timestamp() = 1000
>>>>>    context.timerService().currentWatermark() = 500000
>>>>>
>>>>> Details about how I'm running this:
>>>>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the
>>>>> source.
>>>>> - I'm using FlinkKafkaConsumer
>>>>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No
>>>>> idleness settings.
>>>>> - I'm running similar code in all the environments.  The main
>>>>> difference is low traffic.  I have not been able to reproduce this out of
>>>>> the environment.
>>>>>
>>>>>
>>>>> I put the following process function right after my kafka source.
>>>>>
>>>>> --------
>>>>>
>>>>> AfterSource
>>>>> ts=1647274892728
>>>>> watermark=1647575140007
>>>>> record=...
>>>>>
>>>>>
>>>>> public static class TextLog extends ProcessFunction<Record, Record> {
>>>>>     private final String label;
>>>>>     public TextLogDeliveryLog(String label) {
>>>>>         this.label = label;
>>>>>     }
>>>>>     @Override
>>>>>     public void processElement(Record record, Context context,
>>>>> Collector<Record> collector) throws Exception {
>>>>>         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>>>>>                 label, context.timestamp(),
>>>>> context.timerService().currentWatermark(), record);
>>>>>         collector.collect(deliveryLog);
>>>>>     }
>>>>> }
>>>>>
>>>>

Reply via email to