Hi,

I am using a broadcast pattern for publishing rules and aggregating the
data(https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html).
My use case is similar and also the code.
One thing I wanted to capture is to figure out any latevents if any and
send them to a sink. But when there are events already on the kafka topic
which weren't consumed and start the app after a couple of hours I see
output timestamps messed up.

timestamp: 2021-12-02T04:48:20.324+0000, watermark:
292269055-12-02T16:47:04.192+0000, timeService.watermark:
292269055-12-02T16:47:04.192+0000

I have watermark strategy set on KafkaSource as:

WatermarkStrategy<Record> wmStrategy = WatermarkStrategy

.<CDRRecord>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(OUT_OF_ORDERNESS)))
        .withTimestampAssigner((cdrRecord, timestamp) ->
record.getEventTime());
return

env.addSource(recordSource.assignTimestampsAndWatermarks(wmStrategy))
                .name("records Source")
                .setParallelism(config.get(SOURCE_PARALLELISM));

Please let me know if you need any more information.

Thanks,
Sweta

Reply via email to