I'm glad that I could help :) Piotrek
pon., 25 paź 2021 o 16:04 Alexis Sarda-Espinosa < alexis.sarda-espin...@microfocus.com> napisał(a): > Oh, I got it. I should’ve made the connection earlier after you said “Once > an operator decides to send/broadcast a checkpoint barrier downstream, it > just broadcasts it to all output channels”. > > > > I’ll see what I can do about upgrading the Flink version and do some more > tests with unaligned checkpoints. Thanks again for all the info. > > > > Regards, > > Alexis. > > > > *From:* Piotr Nowojski <pnowoj...@apache.org> > *Sent:* Montag, 25. Oktober 2021 15:51 > *To:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> > *Cc:* Parag Somani <somanipa...@gmail.com>; Caizhi Weng < > tsreape...@gmail.com>; Flink ML <user@flink.apache.org> > *Subject:* Re: Troubleshooting checkpoint timeout > > > > Hi Alexis, > > > > > Should I understand these metrics as a property of an operator and not > of each subtask (at least for aligned checkpoints)? Then “first” and “last” > would make sense to me: first/last across all subtasks/channels for a given > operator. > > > > Those are properties of a subtask. Subtasks are a collection of chained > parallel instances of operators. If you have a simple job like > `source.keyBy(...).window(...).process(...)`, with parallelism of 10, you > will have two tasks. Each task will have 10 subtasks. Each subtask will > have only a single element operator chain, with a single operator (either > source operator for the source task/subtasks, or window/process function > for the second task). If you add a sink to your job > `source.keyBy(...).window(...).process(...).addSink(...)`, this sink will > be chained with the window/process operator. You will still end up with two > tasks: > > > > 1. Source > 2. Window -> Sink > > > > again, each will have 10 subtasks, with parallel instances of the > respective operators. > > > > So if you look at the "alignment duration" of a subtask from "2. Window -> > Sink" task, that will be the difference between receiving a first > checkpoint barrier from any of the "1. Source" subtasks and the last > checkpoint barrier from those "1. Source" subtasks. > > > > > Naturally, for unaligned checkpoints, alignment duration isn’t > applicable, but what about Start Delay? I imagine that might indeed be a > property of the subtask and not the operator. > > As per the docs that I've already linked [1] > > > Alignment Duration: The time between processing the first and the last > checkpoint barrier. For aligned checkpoints, during the alignment, the > channels that have already received checkpoint barriers are blocked from > processing more data. > > > > This number is also defined the same way for the unaligned checkpoints. > Even with unaligned checkpoints a subtask needs to wait for receiving all > of the checkpoint barriers before completing the checkpoint. However, as > subtask can broadcast the checkpoint barrier downstream immediately upon > receiving the first checkpoint barrier AND those checkpoint barriers are > able to overtake in-flight data, the propagation happens very very quickly > for the most part. Hence alignment duration and start delay in this case > should be very small, unless you have deeper problems like long GC pauses. > > > If I’m understanding the aligned checkpoint mechanism correctly, after > the first failure the job restarts and tries to read, let’s say, the last 5 > minutes of data. Then it fails again because the checkpoint times out and, > after restarting, would it try to read, for example, 15 minutes of data? If > there was no backpressure in the source, it could be that the new > checkpoint barriers created after the first restart are behind more data > than before it restarted, no? > > > > I'm not sure if I understand. But yes. It's a valid scenario that: > > 1. timestamp t1, checkpoint 42 completes > 2. failure happens at timestamp t1 + 10 minutes. > 3. timestamp t2, job is recovered to checkpoint 42. > > 4. timestamp t2 + 5 minutes, checkpoint 43 is triggered. > > > > Between 1. and 2., your job could have processed more records than between > 3. and 4. > > > > Piotrek > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/ > > > > pon., 25 paź 2021 o 15:02 Alexis Sarda-Espinosa < > alexis.sarda-espin...@microfocus.com> napisał(a): > > Hi again, > > > > Thanks a lot for taking the time to clarify this. I think that the main > thing that is confusing me is that the UI shows Alignment Duration and > other checkpoint metrics for each subtask, and the resources you’ve sent > always discuss a single barrier per subtask channel. Should I understand > these metrics as a property of an operator and not of each subtask (at > least for aligned checkpoints)? Then “first” and “last” would make sense to > me: first/last across all subtasks/channels for a given operator. > > > > Naturally, for unaligned checkpoints, alignment duration isn’t applicable, > but what about Start Delay? I imagine that might indeed be a property of > the subtask and not the operator. > > > > With respect to my problem, I can also add that my job reads data from > Pulsar, so some of it is buffered in the message bus. If I’m understanding > the aligned checkpoint mechanism correctly, after the first failure the job > restarts and tries to read, let’s say, the last 5 minutes of data. Then it > fails again because the checkpoint times out and, after restarting, would > it try to read, for example, 15 minutes of data? If there was no > backpressure in the source, it could be that the new checkpoint barriers > created after the first restart are behind more data than before it > restarted, no? > > > > Regards, > > Alexis. > > > > *From:* Piotr Nowojski <pnowoj...@apache.org> > *Sent:* Montag, 25. Oktober 2021 13:35 > *To:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> > *Cc:* Parag Somani <somanipa...@gmail.com>; Caizhi Weng < > tsreape...@gmail.com>; Flink ML <user@flink.apache.org> > *Subject:* Re: Troubleshooting checkpoint timeout > > > > Hi again Alexis, > > > > First answering your questions: > > > 1. The documentation states “Operators that receive more than one input > stream need to align the input streams on the snapshot barriers”. If an > operator has parallelism > 1, does that count as more than one stream? Or > is there a single output barrier for all subtask outputs that gets “copied” > to all downstream subtask inputs? > > > > Yes, in this context "more than one input" means more than one > input/communication channels, for example from multiple upstream parallel > operators. You can read about that in some blog posts [1] or presentations > [2] > > > > > 2. Similarly, alignment duration is said to be “The time between > processing the first and the last checkpoint barrier”. What exactly is the > interpretation of “first” and “last” here? Do they relate to a checkpoint > “n” where “first” would be the barrier for n-1 and “last” the one for n? > > > > No. It means the difference between observing the first and last > checkpoint barrier from the same checkpoint "n". If you are still not sure, > I believe if you read/listen the resources I pointed out in the previous > response (or google for "flink checkpointing") it should clear things out :) > > > > > 3. Start delay also refers to the “first checkpoint barrier to reach > this subtask”. As before, what is “first” in this context? > > > > Again, the same answer as above :) In short checkpoint barriers (for one > given checkpoint "n"), are injected in the sources - in all parallel > instances of the source operators. From there, they are traveling through > the job graph to downstream operators. Once a downstream operator receives > the first checkpoint barrier (from checkpoint "n"), the alignment phase > begins. Note that if a downstream operator has parallelism of 100, it will > most likely have 100 network inputs, and it will be waiting for 100 > different checkpoint barriers (one from each upstream operator) during the > alignment phase. After receiving the 100th checkpoint barrier (still > checkpoint "n"), the alignment phase completes. The difference between > "aligned" and "unaligned" checkpoints comes down to when such operator > broadcasts to it's outputs checkpoint barrier for the downstream operators. > In aligned checkpoints such checkpoint barriers are broadcasted at the end > of alignment phase. In unaligned checkpoints it happens after seeing the > first checkpoint barrier. > > > > > 4. Maybe this will be answered by the previous questions, but what > happens to barriers if a downstream operator has lower parallelism? > > > > If there is a different number of input and output channels it doesn't > matter. Once an operator decides to send/broadcast a checkpoint barrier > downstream, it just broadcasts it to all output channels. > > > > Coming back to your problem: > > > > > There’s a good chance the problem begins when the job starts running out > of (heap) memory and the GC introduces delays. That’s of course independent > of Flink and I’ll have to look at the cause > > > > This could easily explain an excessive back pressure. Note that if you > have long GC pauses, it can affect accuracy of the metrics. Keep in mind > that you can always attach a JVM profiler to the Flink's process to analyse > easier what's happening there. > > > > > I guess I was reading older versions of the documentation that didn’t > have that. > > > > The documentation is evolving and we are trying to improve it over time > and Flink's version that you are using is no longer supported/developed. > Combining those two issues might lead to a situation where the > documentation for the more recent Flink versions is better, while still > being accurate for the most part. That's why I've linked you to those more > fresh resources. > > > > Best, > > Piotrek > > > > > [1] > https://flink.apache.org/2020/10/15/from-aligned-to-unaligned-checkpoints-part-1.html > > [2] https://youtu.be/-ABPHMhrecQ > > > > > > pon., 25 paź 2021 o 11:38 Alexis Sarda-Espinosa < > alexis.sarda-espin...@microfocus.com> napisał(a): > > Hi Piotrek, > > > > Thanks for all the information, I guess I was reading older versions of > the documentation that didn’t have that. > > > > I was just using the job graph UI to check backpressure, but after looking > at other factors, I think there is indeed some backpressure, but I don’t > know how it builds up (there’s none at the beginning of the job). I can’t > easily upgrade the Flink version just yet, so I don’t have access to all > the new facilities but based on what I do have I have some additional > remarks/questions. > > > > There’s a good chance the problem begins when the job starts running out > of (heap) memory and the GC introduces delays. That’s of course independent > of Flink and I’ll have to look at the cause, but even if I increase > available memory, I still see delays (at least for some time); I know this > because one of my operators uses timers and logs their timestamps, and I > can see the timer timestamps lagging clock time by up to 1 hour. Since the > logs don’t indicate the operator’s logic takes a significant amount of time > and CPU is far below the available limit (the single TM barely uses more > than 1 CPU out of 4), I’d guess the lag could be related to checkpoint > alignment, which takes me to my questions: > > > > 1. The documentation states “Operators that receive more than one > input stream need to align the input streams on the snapshot barriers”. If > an operator has parallelism > 1, does that count as more than one stream? > Or is there a single output barrier for all subtask outputs that gets > “copied” to all downstream subtask inputs? > 2. Similarly, alignment duration is said to be “The time between > processing the first and the last checkpoint barrier”. What exactly is the > interpretation of “first” and “last” here? Do they relate to a checkpoint > “n” where “first” would be the barrier for n-1 and “last” the one for n? > 3. Start delay also refers to the “first checkpoint barrier to reach > this subtask”. As before, what is “first” in this context? > 4. Maybe this will be answered by the previous questions, but what > happens to barriers if a downstream operator has lower parallelism? > > > > Regards, > > Alexis. > > > > *From:* Piotr Nowojski <pnowoj...@apache.org> > *Sent:* Montag, 25. Oktober 2021 09:59 > *To:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> > *Cc:* Parag Somani <somanipa...@gmail.com>; Caizhi Weng < > tsreape...@gmail.com>; Flink ML <user@flink.apache.org> > *Subject:* Re: Troubleshooting checkpoint timeout > > > > Hi Alexis, > > > > You can read about those metrics in the documentation [1]. Long alignment > duration and start delay almost always come together. High values indicate > long checkpoint barrier propagation times through the job graph, that's > always (at least so far I haven't seen a different reason) caused by the > same thing: backpressure. Which brings me to > > > > > There is no backpressure in any operator. > > > > Why do you think so? > > > > For analysing backpressure I would highly recommend upgrading to Flink 1.13.x > as it has greatly improved tooling for that [2]. Since Flink 1.10 I > believe you can use the `isBackPressured` metric. In previous versions you > would have to rely on buffer usage metrics as described here [3]. > > > > If this is indeed a problem with a backpressure, there are three things > you could do to improve checkpointing time: > > a) Reduce the backpressure, either by optimising your job/code or scaling > up. > > b) Reduce the amount of in-flight data. Since Flink 1.14.x, Flink can do > it automatically when buffer debloating is enabled, but the same > principle could be used to manually and statically configure cluster to > have less in-flight data. You can read about this here [4]. > > c) Enabled unaligned checkpoints [5]. > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/ > > [2] https://flink.apache.org/2021/07/07/backpressure.html > > [3] https://flink.apache.org/2019/07/23/flink-network-stack-2.html > #network-metrics > > [4] > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/network_mem_tuning/#the-buffer-debloating-mechanism > > [5] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints > > > > Best, > > Piotrek > > > > czw., 21 paź 2021 o 19:00 Alexis Sarda-Espinosa < > alexis.sarda-espin...@microfocus.com> napisał(a): > > I would really appreciate more fine-grained information regarding the > factors that can affect a checkpoint’s: > > > > - Sync duration > - Async duration > - Alignment duration > - Start delay > > > > Otherwise those metrics don’t really help me know in which areas to look > for issues. > > > > Regards, > > Alexis. > > > > *From:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> > *Sent:* Mittwoch, 20. Oktober 2021 09:43 > *To:* Parag Somani <somanipa...@gmail.com>; Caizhi Weng < > tsreape...@gmail.com> > *Cc:* Flink ML <user@flink.apache.org> > *Subject:* RE: Troubleshooting checkpoint timeout > > > > Currently the windows are 10 minutes in size with a 1-minute slide time. > The approximate 500 event/minute throughput is already rather high for my > use case, so I don’t expect it to be higher, but I would imagine that’s > still pretty low. > > > > I did have some issues with storage space, and I wouldn’t be surprised if > there is an IO bottleneck in my dev environment, but then my main question > would be: if IO is being throttled, could that result in the high “start > delay” times I observe? That seems to be the main slowdown, so I just want > to be sure I’m looking in the right direction. > > > > I’d like to mention another thing about my pipeline’s structure in case > it’s relevant, although it may be completely unrelated. I said that I > specify the windowing properties once (windowedStream in my 1st e-mail) > and use it twice, but it’s actually used 3 times. In addition to the 2 > ProcessWindowFunctions that end in sinks, the stream is also joined with a > side output: > > > > openedEventsTimestamped = openedEvents > > .getSideOutput(…) > > .keyBy(keySelector) > > .assignTimestampsAndWatermarks(watermarkStrategy) > > > > windowedStream > > .process(ProcessWindowFunction3()) > > .keyBy(keySelector) > > > .connect(DataStreamUtils.reinterpretAsKeyedStream(openedEventsTimestamped, > keySelector)) > > .process(...) > > > > Could this lead to delays or alignment issues? > > > > Regards, > > Alexis. > > > > *From:* Parag Somani <somanipa...@gmail.com> > *Sent:* Mittwoch, 20. Oktober 2021 09:22 > *To:* Caizhi Weng <tsreape...@gmail.com> > *Cc:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>; Flink > ML <user@flink.apache.org> > *Subject:* Re: Troubleshooting checkpoint timeout > > > > I had similar problem, where i have concurrent two checkpoints were > configured. Also, i used to save it in S3(using minio) on k8s 1.18 env. > > > > Flink service were getting restarted and timeout was happening. It got > resolved: > > 1. As minio ran out of disk space, caused failure of checkpoints(this was > the main cause). > > 2. Added duration/interval of checkpoint parameter to address it > > execution.checkpointing.max-concurrent-checkpoints and > execution.checkpointing.min-pause > > Details of same at: > > > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#checkpointing > > > > > > On Wed, Oct 20, 2021 at 7:50 AM Caizhi Weng <tsreape...@gmail.com> wrote: > > Hi! > > > > I see you're using sliding event time windows. What's the exact value of > windowLengthMinutes and windowSlideTimeMinutes? If windowLengthMinutes is > large and windowSlideTimeMinutes is small then each record may be assigned > to a large number of windows as the pipeline proceeds, thus gradually slows > down checkpointing and finally causes a timeout. > > > > Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> 于2021年10月19 > 日周二 下午7:29写道: > > Hello everyone, > > > > I am doing performance tests for one of our streaming applications and, > after increasing the throughput a bit (~500 events per minute), it has > started failing because checkpoints cannot be completed within 10 minutes. > The Flink cluster is not exactly under my control and is running on > Kubernetes with version 1.11.3 and RocksDB backend. > > > > I can access the UI and logs and have confirmed: > > > > - Logs do indicate expired checkpoints. > - There is no backpressure in any operator. > - When checkpoints do complete (seemingly at random): > > > - Size is 10-20MB. > - Sync and Async durations are at most 1-2 seconds. > - In one of the tasks, alignment takes 1-3 minutes, but start > delays grow to up to 5 minutes. > > > - The aforementioned task (the one with 5-minute start delay) has 8 > sub-tasks and I see no indication of data skew. When the checkpoint times > out, none of the sub-tasks have acknowledged the checkpoint. > > > > The problematic task that is failing very often (and holding back > downstream tasks) consists of the following operations: > > > > timestampedEventStream = events > > .keyBy(keySelector) > > .assignTimestampsAndWatermarks(watermarkStrategy); > > > > windowedStream = > DataStreamUtils.reinterpretAsKeyedStream(timestampedEventStream, > keySelector) > > .window(SlidingEventTimeWindows.of( > > Time.minutes(windowLengthMinutes), > > Time.minutes(windowSlideTimeMinutes))) > > .allowedLateness(Time.minutes(allowedLatenessMinutes)); > > > > windowedStream > > .process(new ProcessWindowFunction1(config)) > > // add sink > > > > windowedStream > > .process(new ProcessWindowFunction2(config)) > > // add sink > > > > Both window functions are using managed state, but nothing out of the > ordinary (as mentioned above, state size is actually very small). Do note > that the same windowedStream is used twice. > > > > I don’t see any obvious runtime issues and I don’t think the load is > particularly high, but maybe there’s something wrong in my pipeline > definition? What else could cause these timeouts? > > > > Regards, > > Alexis. > > > > > > -- > > Regards, > Parag Surajmal Somani. > >