Hi!

I see you're using sliding event time windows. What's the exact value of
windowLengthMinutes and windowSlideTimeMinutes? If windowLengthMinutes is
large and windowSlideTimeMinutes is small then each record may be assigned
to a large number of windows as the pipeline proceeds, thus gradually slows
down checkpointing and finally causes a timeout.

Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> 于2021年10月19日周二
下午7:29写道:

> Hello everyone,
>
>
>
> I am doing performance tests for one of our streaming applications and,
> after increasing the throughput a bit (~500 events per minute), it has
> started failing because checkpoints cannot be completed within 10 minutes.
> The Flink cluster is not exactly under my control and is running on
> Kubernetes with version 1.11.3 and RocksDB backend.
>
>
>
> I can access the UI and logs and have confirmed:
>
>
>
>    - Logs do indicate expired checkpoints.
>    - There is no backpressure in any operator.
>    - When checkpoints do complete (seemingly at random):
>       - Size is 10-20MB.
>       - Sync and Async durations are at most 1-2 seconds.
>       - In one of the tasks, alignment takes 1-3 minutes, but start
>       delays grow to up to 5 minutes.
>    - The aforementioned task (the one with 5-minute start delay) has 8
>    sub-tasks and I see no indication of data skew. When the checkpoint times
>    out, none of the sub-tasks have acknowledged the checkpoint.
>
>
>
> The problematic task that is failing very often (and holding back
> downstream tasks) consists of the following operations:
>
>
>
> timestampedEventStream = events
>
>                 .keyBy(keySelector)
>
>                 .assignTimestampsAndWatermarks(watermarkStrategy);
>
>
>
> windowedStream =
> DataStreamUtils.reinterpretAsKeyedStream(timestampedEventStream,
> keySelector)
>
>                 .window(SlidingEventTimeWindows.of(
>
>                         Time.minutes(windowLengthMinutes),
>
>                         Time.minutes(windowSlideTimeMinutes)))
>
>                 .allowedLateness(Time.minutes(allowedLatenessMinutes));
>
>
>
> windowedStream
>
>                     .process(new ProcessWindowFunction1(config))
>
>                     // add sink
>
>
>
> windowedStream
>
>                     .process(new ProcessWindowFunction2(config))
>
>                     // add sink
>
>
>
> Both window functions are using managed state, but nothing out of the
> ordinary (as mentioned above, state size is actually very small). Do note
> that the same windowedStream is used twice.
>
>
>
> I don’t see any obvious runtime issues and I don’t think the load is
> particularly high, but maybe there’s something wrong in my pipeline
> definition? What else could cause these timeouts?
>
>
>
> Regards,
>
> Alexis.
>
>
>

Reply via email to