Hi! I see you're using sliding event time windows. What's the exact value of windowLengthMinutes and windowSlideTimeMinutes? If windowLengthMinutes is large and windowSlideTimeMinutes is small then each record may be assigned to a large number of windows as the pipeline proceeds, thus gradually slows down checkpointing and finally causes a timeout.
Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> 于2021年10月19日周二 下午7:29写道: > Hello everyone, > > > > I am doing performance tests for one of our streaming applications and, > after increasing the throughput a bit (~500 events per minute), it has > started failing because checkpoints cannot be completed within 10 minutes. > The Flink cluster is not exactly under my control and is running on > Kubernetes with version 1.11.3 and RocksDB backend. > > > > I can access the UI and logs and have confirmed: > > > > - Logs do indicate expired checkpoints. > - There is no backpressure in any operator. > - When checkpoints do complete (seemingly at random): > - Size is 10-20MB. > - Sync and Async durations are at most 1-2 seconds. > - In one of the tasks, alignment takes 1-3 minutes, but start > delays grow to up to 5 minutes. > - The aforementioned task (the one with 5-minute start delay) has 8 > sub-tasks and I see no indication of data skew. When the checkpoint times > out, none of the sub-tasks have acknowledged the checkpoint. > > > > The problematic task that is failing very often (and holding back > downstream tasks) consists of the following operations: > > > > timestampedEventStream = events > > .keyBy(keySelector) > > .assignTimestampsAndWatermarks(watermarkStrategy); > > > > windowedStream = > DataStreamUtils.reinterpretAsKeyedStream(timestampedEventStream, > keySelector) > > .window(SlidingEventTimeWindows.of( > > Time.minutes(windowLengthMinutes), > > Time.minutes(windowSlideTimeMinutes))) > > .allowedLateness(Time.minutes(allowedLatenessMinutes)); > > > > windowedStream > > .process(new ProcessWindowFunction1(config)) > > // add sink > > > > windowedStream > > .process(new ProcessWindowFunction2(config)) > > // add sink > > > > Both window functions are using managed state, but nothing out of the > ordinary (as mentioned above, state size is actually very small). Do note > that the same windowedStream is used twice. > > > > I don’t see any obvious runtime issues and I don’t think the load is > particularly high, but maybe there’s something wrong in my pipeline > definition? What else could cause these timeouts? > > > > Regards, > > Alexis. > > >