Other points
- I'm using the kafka timestamp as event time.
- The same issue happens even if I use an idle watermark.

On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com> wrote:

> There are 12 Kafka partitions (to keep the structure similar to other low
> traffic environments).
>
> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> wrote:
>
>> Hi.
>>
>> I'm running a backfill from a kafka topic with very few records spread
>> across a few days.  I'm seeing a case where the records coming from a kafka
>> source have a watermark that's more recent (by hours) than the event time.
>> I haven't seen this before when running this.  This violates what I'd
>> assume the kafka source would do.
>>
>> Example problem:
>> 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual times
>> are separated by a longer time period.
>> 2.  My first operator after the FlinkKafkaConsumer sees:
>>    context.timestamp() = 1000
>>    context.timerService().currentWatermark() = 500000
>>
>> Details about how I'm running this:
>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
>> - I'm using FlinkKafkaConsumer
>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No idleness
>> settings.
>> - I'm running similar code in all the environments.  The main difference
>> is low traffic.  I have not been able to reproduce this out of the
>> environment.
>>
>>
>> I put the following process function right after my kafka source.
>>
>> --------
>>
>> AfterSource
>> ts=1647274892728
>> watermark=1647575140007
>> record=...
>>
>>
>> public static class TextLog extends ProcessFunction<Record, Record> {
>>     private final String label;
>>     public TextLogDeliveryLog(String label) {
>>         this.label = label;
>>     }
>>     @Override
>>     public void processElement(Record record, Context context,
>> Collector<Record> collector) throws Exception {
>>         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>>                 label, context.timestamp(),
>> context.timerService().currentWatermark(), record);
>>         collector.collect(deliveryLog);
>>     }
>> }
>>
>

Reply via email to