Hi Dan,

Do you use the per-partition watermarking explained in [1]?
I've also experienced a similar problem when running backfill jobs
specifically when # source tasks < # kafka partitions.
- When # source tasks = # kafka partitions, the backfill job works as
expected.
- When # source tasks < # kafka partitions, a Kafka consumer consumes
multiple partitions. This case can destroying the per-partition patterns as
explained in [2].

Hope this helps.

p.s. If you plan to use the per-partition watermarking, be aware that
idleness detection [3] can cause another problem when you run a backfill
job. Kafka source tasks in a backfill job seem to read a batch of records
from Kafka and then wait for downstream tasks to catch up the progress,
which can be counted as idleness.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources

Best,

Dongwon

On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com> wrote:

> I'm following the example from this section:
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>
> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com> wrote:
>
>> Other points
>> - I'm using the kafka timestamp as event time.
>> - The same issue happens even if I use an idle watermark.
>>
>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com> wrote:
>>
>>> There are 12 Kafka partitions (to keep the structure similar to other
>>> low traffic environments).
>>>
>>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> wrote:
>>>
>>>> Hi.
>>>>
>>>> I'm running a backfill from a kafka topic with very few records spread
>>>> across a few days.  I'm seeing a case where the records coming from a kafka
>>>> source have a watermark that's more recent (by hours) than the event time.
>>>> I haven't seen this before when running this.  This violates what I'd
>>>> assume the kafka source would do.
>>>>
>>>> Example problem:
>>>> 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual times
>>>> are separated by a longer time period.
>>>> 2.  My first operator after the FlinkKafkaConsumer sees:
>>>>    context.timestamp() = 1000
>>>>    context.timerService().currentWatermark() = 500000
>>>>
>>>> Details about how I'm running this:
>>>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
>>>> - I'm using FlinkKafkaConsumer
>>>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No
>>>> idleness settings.
>>>> - I'm running similar code in all the environments.  The main
>>>> difference is low traffic.  I have not been able to reproduce this out of
>>>> the environment.
>>>>
>>>>
>>>> I put the following process function right after my kafka source.
>>>>
>>>> --------
>>>>
>>>> AfterSource
>>>> ts=1647274892728
>>>> watermark=1647575140007
>>>> record=...
>>>>
>>>>
>>>> public static class TextLog extends ProcessFunction<Record, Record> {
>>>>     private final String label;
>>>>     public TextLogDeliveryLog(String label) {
>>>>         this.label = label;
>>>>     }
>>>>     @Override
>>>>     public void processElement(Record record, Context context,
>>>> Collector<Record> collector) throws Exception {
>>>>         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>>>>                 label, context.timestamp(),
>>>> context.timerService().currentWatermark(), record);
>>>>         collector.collect(deliveryLog);
>>>>     }
>>>> }
>>>>
>>>

Reply via email to