Hi,
I’m experiencing the same issue on Flink 18.1.
I have a slightly different job graph. I have a single Kafka Source
(parallelism 6) that is consuming from 2 topics, one topic with 4 partitions
and one topic with 6 partitions.
The autoWatermarkInteval change to 0 didn’t fix my issue.
Did you ever find a solution to this problem please?
On 2024/01/17 16:38:40 "adrianbartnik.mailbox.org via user" wrote:
> Hi everyone,
>
> we are struggling to understand how Flink handles watermarks from Kafka when
> reprocessing events using their event time.
>
> The goal of the Flink job is to consume events from 3 Kafka topics (3
> partitions each), order them using their event time (using a process
> function) and write them to a single output topic. We are using Flink 1.15.2
> and have a BoundedOutOfOrderness of 5 seconds.
>
> Job Graph:
>
> Kafka Topic 1 Kafka Source Map Operator---
> ~80k events
> |
>
> |
> Kafka Topic 2 Kafka Source Map Operator Union Operator
> KeyBy Operator --- Process Function - Kafka Output
> ~5k events
>|
>
> |
> Kafka Topic 3 Kafka Source Map Operator---
> ~1k events
>
> Our Assumptions
>
> * The Union-Operator doesn‘t make a difference for the watermarks, e.g.
> it simply propagates the watermarks of the Kafka Sources to the
> ProcessFunction
> * Each Flink Kafka Source emits one watermark that is the minimum among
> the minimum extracted timestamps across all it's partitions (per-partition
> watermarks)
> Desired Behaviour
>
> * The Flink Kafka Source advance their Watermarks according to their
> progress of processing the events. The ProcessFunction buffers each event in
> its local state and only releases them (ordered by observered event time)
> once it received a watermark from all its Kafka Sources.
>
> Actual Behaviour
>
> * When starting the Flink Job, it reprocesses the existing data in the
> Kafka Topics. We see that the Watermark advances (In the Flink WebUI) way to
> quickly (especially for the large stream) and the ProcessFunction quickly
> sees a very recent watermark, when Flink still processes lots of old events
> from the large topic. This very recent watermark causes the ProcessFunction
> to mark all other events coming in as late and drops them.
>
> Other Observations
>
> * If we set the AutoWatermarkingInterval to 0, the processing function
> orders the events correctly based on event time (We are generating watermarks
> based a TimestampAssigner and a custom Watermarking Strategy). The timestamp
> assignment is by event.
> * If we leave the AutoWatermarkingInterval to 200 (default value), the
> process function considers ~95% of all incoming events as late - regardless
> if we assign timestamps by event or emit them periodically.
> * We changed the Watermarking Generator to Periodic and we see the same
> results
> * We have tried alignment groups, however, in this case, the job doesn’t
> make any progress at all and seems stuck.
>
> Our Questions
>
> * Why do we see the behavior that we are seen? Where is our knowledge gap
> on how the Flink Kafka Source generates its watermarks?
> * Are the Watermarks for the Flink Kafka source generated and emitted
> after each event or periodically? How is it related the each partition?
> * Why does it works if we set the AutoWatermarkingInterval to 0? What
> does this change for Flink in the Watermark generation and propagation?
> * Why doesn’t alignment group work in this context and why does it seem
> stuck?
>
> We are thankful for every input!
> Cheers
>
> Source Code of Process Function
> public class ProcessFunction extends KeyedProcessFunction Event > {
>
> private transient MapState> queueState = null;
>
> @Override
> public void open(Configuration config) {
> TypeInformation key = TypeInformation.of(new TypeHint()
> {});
> TypeInformation> value = TypeInformation.of(new
> TypeHint() {});
> queueState = getRuntimeContext().getMapState(new
> MapStateDescriptor<>("events-by-timestamp", key, value));
> }
>
> @Override
> public void processElement(Event event, KeyedProcessFunction Event, Event>.Context ctx, Collector out) throws Exception {
>
> TimerService timerService = ctx.timerService();
> if (ctx.timestamp() > timerService.currentWatermark()) {
>
> List< Event > listEvents = queueState.get(ctx.timestamp());
> if (isEmpty(listEvents)) {
> listEvents = new ArrayList<>();
> }
>
>