I'm glad that I could help :)

Piotrek

pon., 25 paź 2021 o 16:04 Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> napisał(a):

> Oh, I got it. I should’ve made the connection earlier after you said “Once
> an operator decides to send/broadcast a checkpoint barrier downstream, it
> just broadcasts it to all output channels”.
>
>
>
> I’ll see what I can do about upgrading the Flink version and do some more
> tests with unaligned checkpoints. Thanks again for all the info.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Piotr Nowojski <pnowoj...@apache.org>
> *Sent:* Montag, 25. Oktober 2021 15:51
> *To:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> *Cc:* Parag Somani <somanipa...@gmail.com>; Caizhi Weng <
> tsreape...@gmail.com>; Flink ML <user@flink.apache.org>
> *Subject:* Re: Troubleshooting checkpoint timeout
>
>
>
> Hi Alexis,
>
>
>
> >  Should I understand these metrics as a property of an operator and not
> of each subtask (at least for aligned checkpoints)? Then “first” and “last”
> would make sense to me: first/last across all subtasks/channels for a given
> operator.
>
>
>
> Those are properties of a subtask. Subtasks are a collection of chained
> parallel instances of operators. If you have a simple job like
> `source.keyBy(...).window(...).process(...)`, with parallelism of 10, you
> will have two tasks. Each task will have 10 subtasks. Each subtask will
> have only a single element operator chain, with a single operator (either
> source operator for the source task/subtasks, or window/process function
> for the second task). If you add a sink to your job
> `source.keyBy(...).window(...).process(...).addSink(...)`, this sink will
> be chained with the window/process operator. You will still end up with two
> tasks:
>
>
>
> 1. Source
> 2. Window -> Sink
>
>
>
> again, each will have 10 subtasks, with parallel instances of the
> respective operators.
>
>
>
> So if you look at the "alignment duration" of a subtask from "2. Window ->
> Sink" task, that will be the difference between receiving a first
> checkpoint barrier from any of the "1. Source" subtasks and the last
> checkpoint barrier from those "1. Source" subtasks.
>
>
>
> > Naturally, for unaligned checkpoints, alignment duration isn’t
> applicable, but what about Start Delay? I imagine that might indeed be a
> property of the subtask and not the operator.
>
> As per the docs that I've already linked [1]
>
>
> Alignment Duration: The time between processing the first and the last
> checkpoint barrier. For aligned checkpoints, during the alignment, the
> channels that have already received checkpoint barriers are blocked from
> processing more data.
>
>
>
> This number is also defined the same way for the unaligned checkpoints.
> Even with unaligned checkpoints a subtask needs to wait for receiving all
> of the checkpoint barriers before completing the checkpoint. However, as
> subtask can broadcast the checkpoint barrier downstream immediately upon
> receiving the first checkpoint barrier AND those checkpoint barriers are
> able to overtake in-flight data, the propagation happens very very quickly
> for the most part. Hence alignment duration and start delay in this case
> should be very small, unless you have deeper problems like long GC pauses.
>
> > If I’m understanding the aligned checkpoint mechanism correctly, after
> the first failure the job restarts and tries to read, let’s say, the last 5
> minutes of data. Then it fails again because the checkpoint times out and,
> after restarting, would it try to read, for example, 15 minutes of data? If
> there was no backpressure in the source, it could be that the new
> checkpoint barriers created after the first restart are behind more data
> than before it restarted, no?
>
>
>
> I'm not sure if I understand. But yes. It's a valid scenario that:
>
> 1. timestamp t1, checkpoint 42 completes
> 2. failure happens at timestamp t1 + 10 minutes.
> 3. timestamp t2, job is recovered to checkpoint 42.
>
> 4. timestamp t2 + 5 minutes, checkpoint 43 is triggered.
>
>
>
> Between 1. and 2., your job could have processed more records than between
> 3. and 4.
>
>
>
> Piotrek
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/
>
>
>
> pon., 25 paź 2021 o 15:02 Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> napisał(a):
>
> Hi again,
>
>
>
> Thanks a lot for taking the time to clarify this. I think that the main
> thing that is confusing me is that the UI shows Alignment Duration and
> other checkpoint metrics for each subtask, and the resources you’ve sent
> always discuss a single barrier per subtask channel. Should I understand
> these metrics as a property of an operator and not of each subtask (at
> least for aligned checkpoints)? Then “first” and “last” would make sense to
> me: first/last across all subtasks/channels for a given operator.
>
>
>
> Naturally, for unaligned checkpoints, alignment duration isn’t applicable,
> but what about Start Delay? I imagine that might indeed be a property of
> the subtask and not the operator.
>
>
>
> With respect to my problem, I can also add that my job reads data from
> Pulsar, so some of it is buffered in the message bus. If I’m understanding
> the aligned checkpoint mechanism correctly, after the first failure the job
> restarts and tries to read, let’s say, the last 5 minutes of data. Then it
> fails again because the checkpoint times out and, after restarting, would
> it try to read, for example, 15 minutes of data? If there was no
> backpressure in the source, it could be that the new checkpoint barriers
> created after the first restart are behind more data than before it
> restarted, no?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Piotr Nowojski <pnowoj...@apache.org>
> *Sent:* Montag, 25. Oktober 2021 13:35
> *To:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> *Cc:* Parag Somani <somanipa...@gmail.com>; Caizhi Weng <
> tsreape...@gmail.com>; Flink ML <user@flink.apache.org>
> *Subject:* Re: Troubleshooting checkpoint timeout
>
>
>
> Hi again Alexis,
>
>
>
> First answering your questions:
>
> > 1. The documentation states “Operators that receive more than one input
> stream need to align the input streams on the snapshot barriers”. If an
> operator has parallelism > 1, does that count as more than one stream? Or
> is there a single output barrier for all subtask outputs that gets “copied”
> to all downstream subtask inputs?
>
>
>
> Yes, in this context "more than one input" means more than one
> input/communication channels, for example from multiple upstream parallel
> operators. You can read about that in some blog posts [1] or presentations
> [2]
>
>
>
> > 2. Similarly, alignment duration is said to be “The time between
> processing the first and the last checkpoint barrier”. What exactly is the
> interpretation of “first” and “last” here? Do they relate to a checkpoint
> “n” where “first” would be the barrier for n-1 and “last” the one for n?
>
>
>
> No. It means the difference between observing the first and last
> checkpoint barrier from the same checkpoint "n". If you are still not sure,
> I believe if you read/listen the resources I pointed out in the previous
> response (or google for "flink checkpointing") it should clear things out :)
>
>
>
> > 3. Start delay also refers to the “first checkpoint barrier to reach
> this subtask”. As before, what is “first” in this context?
>
>
>
> Again, the same answer as above :) In short checkpoint barriers (for one
> given checkpoint "n"), are injected in the sources - in all parallel
> instances of the source operators. From there, they are traveling through
> the job graph to downstream operators. Once a downstream operator receives
> the first checkpoint barrier (from checkpoint "n"), the alignment phase
> begins. Note that if a downstream operator has parallelism of 100, it will
> most likely have 100 network inputs, and it will be waiting for 100
> different checkpoint barriers (one from each upstream operator) during the
> alignment phase. After receiving the 100th checkpoint barrier (still
> checkpoint "n"), the alignment phase completes. The difference between
> "aligned" and "unaligned" checkpoints comes down to when such operator
> broadcasts to it's outputs checkpoint barrier for the downstream operators.
> In aligned checkpoints such checkpoint barriers are broadcasted at the end
> of alignment phase. In unaligned checkpoints it happens after seeing the
> first checkpoint barrier.
>
>
>
> > 4. Maybe this will be answered by the previous questions, but what
> happens to barriers if a downstream operator has lower parallelism?
>
>
>
> If there is a different number of input and output channels it doesn't
> matter. Once an operator decides to send/broadcast a checkpoint barrier
> downstream, it just broadcasts it to all output channels.
>
>
>
> Coming back to your problem:
>
>
>
> > There’s a good chance the problem begins when the job starts running out
> of (heap) memory and the GC introduces delays. That’s of course independent
> of Flink and I’ll have to look at the cause
>
>
>
> This could easily explain an excessive back pressure. Note that if you
> have long GC pauses, it can affect accuracy of the metrics. Keep in mind
> that you can always attach a JVM profiler to the Flink's process to analyse
> easier what's happening there.
>
>
>
> > I guess I was reading older versions of the documentation that didn’t
> have that.
>
>
>
> The documentation is evolving and we are trying to improve it over time
> and Flink's version that you are using is no longer supported/developed.
> Combining those two issues might lead to a situation where the
> documentation for the more recent Flink versions is better, while still
> being accurate for the most part. That's why I've linked you to those more
> fresh resources.
>
>
>
> Best,
>
> Piotrek
>
>
>
>
> [1]
> https://flink.apache.org/2020/10/15/from-aligned-to-unaligned-checkpoints-part-1.html
>
> [2] https://youtu.be/-ABPHMhrecQ
>
>
>
>
>
> pon., 25 paź 2021 o 11:38 Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> napisał(a):
>
> Hi Piotrek,
>
>
>
> Thanks for all the information, I guess I was reading older versions of
> the documentation that didn’t have that.
>
>
>
> I was just using the job graph UI to check backpressure, but after looking
> at other factors, I think there is indeed some backpressure, but I don’t
> know how it builds up (there’s none at the beginning of the job). I can’t
> easily upgrade the Flink version just yet, so I don’t have access to all
> the new facilities but based on what I do have I have some additional
> remarks/questions.
>
>
>
> There’s a good chance the problem begins when the job starts running out
> of (heap) memory and the GC introduces delays. That’s of course independent
> of Flink and I’ll have to look at the cause, but even if I increase
> available memory, I still see delays (at least for some time); I know this
> because one of my operators uses timers and logs their timestamps, and I
> can see the timer timestamps lagging clock time by up to 1 hour. Since the
> logs don’t indicate the operator’s logic takes a significant amount of time
> and CPU is far below the available limit (the single TM barely uses more
> than 1 CPU out of 4), I’d guess the lag could be related to checkpoint
> alignment, which takes me to my questions:
>
>
>
>    1. The documentation states “Operators that receive more than one
>    input stream need to align the input streams on the snapshot barriers”. If
>    an operator has parallelism > 1, does that count as more than one stream?
>    Or is there a single output barrier for all subtask outputs that gets
>    “copied” to all downstream subtask inputs?
>    2. Similarly, alignment duration is said to be “The time between
>    processing the first and the last checkpoint barrier”. What exactly is the
>    interpretation of “first” and “last” here? Do they relate to a checkpoint
>    “n” where “first” would be the barrier for n-1 and “last” the one for n?
>    3. Start delay also refers to the “first checkpoint barrier to reach
>    this subtask”. As before, what is “first” in this context?
>    4. Maybe this will be answered by the previous questions, but what
>    happens to barriers if a downstream operator has lower parallelism?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Piotr Nowojski <pnowoj...@apache.org>
> *Sent:* Montag, 25. Oktober 2021 09:59
> *To:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> *Cc:* Parag Somani <somanipa...@gmail.com>; Caizhi Weng <
> tsreape...@gmail.com>; Flink ML <user@flink.apache.org>
> *Subject:* Re: Troubleshooting checkpoint timeout
>
>
>
> Hi Alexis,
>
>
>
> You can read about those metrics in the documentation [1]. Long alignment
> duration and start delay almost always come together. High values indicate
> long checkpoint barrier propagation times through the job graph, that's
> always (at least so far I haven't seen a different reason) caused by the
> same thing: backpressure. Which brings me to
>
>
>
> > There is no backpressure in any operator.
>
>
>
> Why do you think so?
>
>
>
> For analysing backpressure I would highly recommend upgrading to Flink 1.13.x
> as it has greatly improved tooling for that [2]. Since Flink 1.10 I
> believe you can use the `isBackPressured` metric. In previous versions you
> would have to rely on buffer usage metrics as described here [3].
>
>
>
> If this is indeed a problem with a backpressure, there are three things
> you could do to improve checkpointing time:
>
> a) Reduce the backpressure, either by optimising your job/code or scaling
> up.
>
> b) Reduce the amount of in-flight data. Since Flink 1.14.x, Flink can do
> it automatically when buffer debloating is enabled, but the same
> principle could be used to manually and statically configure cluster to
> have less in-flight data. You can read about this here [4].
>
> c) Enabled unaligned checkpoints [5].
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/
>
> [2] https://flink.apache.org/2021/07/07/backpressure.html
>
> [3] https://flink.apache.org/2019/07/23/flink-network-stack-2.html
> #network-metrics
>
> [4]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/network_mem_tuning/#the-buffer-debloating-mechanism
>
> [5]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints
>
>
>
> Best,
>
> Piotrek
>
>
>
> czw., 21 paź 2021 o 19:00 Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> napisał(a):
>
> I would really appreciate more fine-grained information regarding the
> factors that can affect a checkpoint’s:
>
>
>
>    - Sync duration
>    - Async duration
>    - Alignment duration
>    - Start delay
>
>
>
> Otherwise those metrics don’t really help me know in which areas to look
> for issues.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> *Sent:* Mittwoch, 20. Oktober 2021 09:43
> *To:* Parag Somani <somanipa...@gmail.com>; Caizhi Weng <
> tsreape...@gmail.com>
> *Cc:* Flink ML <user@flink.apache.org>
> *Subject:* RE: Troubleshooting checkpoint timeout
>
>
>
> Currently the windows are 10 minutes in size with a 1-minute slide time.
> The approximate 500 event/minute throughput is already rather high for my
> use case, so I don’t expect it to be higher, but I would imagine that’s
> still pretty low.
>
>
>
> I did have some issues with storage space, and I wouldn’t be surprised if
> there is an IO bottleneck in my dev environment, but then my main question
> would be: if IO is being throttled, could that result in the high “start
> delay” times I observe? That seems to be the main slowdown, so I just want
> to be sure I’m looking in the right direction.
>
>
>
> I’d like to mention another thing about my pipeline’s structure in case
> it’s relevant, although it may be completely unrelated. I said that I
> specify the windowing properties once (windowedStream in my 1st e-mail)
> and use it twice, but it’s actually used 3 times. In addition to the 2
> ProcessWindowFunctions that end in sinks, the stream is also joined with a
> side output:
>
>
>
> openedEventsTimestamped = openedEvents
>
>         .getSideOutput(…)
>
>         .keyBy(keySelector)
>
>         .assignTimestampsAndWatermarks(watermarkStrategy)
>
>
>
> windowedStream
>
>         .process(ProcessWindowFunction3())
>
>         .keyBy(keySelector)
>
>
> .connect(DataStreamUtils.reinterpretAsKeyedStream(openedEventsTimestamped,
> keySelector))
>
>         .process(...)
>
>
>
> Could this lead to delays or alignment issues?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Parag Somani <somanipa...@gmail.com>
> *Sent:* Mittwoch, 20. Oktober 2021 09:22
> *To:* Caizhi Weng <tsreape...@gmail.com>
> *Cc:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>; Flink
> ML <user@flink.apache.org>
> *Subject:* Re: Troubleshooting checkpoint timeout
>
>
>
> I had similar problem, where i have concurrent two checkpoints were
> configured. Also, i used to save it in S3(using minio) on k8s 1.18 env.
>
>
>
> Flink service were getting restarted and timeout was happening. It got
> resolved:
>
> 1. As minio ran out of disk space, caused failure of checkpoints(this was
> the main cause).
>
> 2. Added duration/interval of checkpoint parameter to address it
>
> execution.checkpointing.max-concurrent-checkpoints and
> execution.checkpointing.min-pause
>
> Details of same at:
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#checkpointing
>
>
>
>
>
> On Wed, Oct 20, 2021 at 7:50 AM Caizhi Weng <tsreape...@gmail.com> wrote:
>
> Hi!
>
>
>
> I see you're using sliding event time windows. What's the exact value of
> windowLengthMinutes and windowSlideTimeMinutes? If windowLengthMinutes is
> large and windowSlideTimeMinutes is small then each record may be assigned
> to a large number of windows as the pipeline proceeds, thus gradually slows
> down checkpointing and finally causes a timeout.
>
>
>
> Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> 于2021年10月19
> 日周二 下午7:29写道:
>
> Hello everyone,
>
>
>
> I am doing performance tests for one of our streaming applications and,
> after increasing the throughput a bit (~500 events per minute), it has
> started failing because checkpoints cannot be completed within 10 minutes.
> The Flink cluster is not exactly under my control and is running on
> Kubernetes with version 1.11.3 and RocksDB backend.
>
>
>
> I can access the UI and logs and have confirmed:
>
>
>
>    - Logs do indicate expired checkpoints.
>    - There is no backpressure in any operator.
>    - When checkpoints do complete (seemingly at random):
>
>
>    - Size is 10-20MB.
>       - Sync and Async durations are at most 1-2 seconds.
>       - In one of the tasks, alignment takes 1-3 minutes, but start
>       delays grow to up to 5 minutes.
>
>
>    - The aforementioned task (the one with 5-minute start delay) has 8
>    sub-tasks and I see no indication of data skew. When the checkpoint times
>    out, none of the sub-tasks have acknowledged the checkpoint.
>
>
>
> The problematic task that is failing very often (and holding back
> downstream tasks) consists of the following operations:
>
>
>
> timestampedEventStream = events
>
>                 .keyBy(keySelector)
>
>                 .assignTimestampsAndWatermarks(watermarkStrategy);
>
>
>
> windowedStream =
> DataStreamUtils.reinterpretAsKeyedStream(timestampedEventStream,
> keySelector)
>
>                 .window(SlidingEventTimeWindows.of(
>
>                         Time.minutes(windowLengthMinutes),
>
>                         Time.minutes(windowSlideTimeMinutes)))
>
>                 .allowedLateness(Time.minutes(allowedLatenessMinutes));
>
>
>
> windowedStream
>
>                     .process(new ProcessWindowFunction1(config))
>
>                     // add sink
>
>
>
> windowedStream
>
>                     .process(new ProcessWindowFunction2(config))
>
>                     // add sink
>
>
>
> Both window functions are using managed state, but nothing out of the
> ordinary (as mentioned above, state size is actually very small). Do note
> that the same windowedStream is used twice.
>
>
>
> I don’t see any obvious runtime issues and I don’t think the load is
> particularly high, but maybe there’s something wrong in my pipeline
> definition? What else could cause these timeouts?
>
>
>
> Regards,
>
> Alexis.
>
>
>
>
>
> --
>
> Regards,
> Parag Surajmal Somani.
>
>

Reply via email to