Hi again,

Thanks a lot for taking the time to clarify this. I think that the main thing 
that is confusing me is that the UI shows Alignment Duration and other 
checkpoint metrics for each subtask, and the resources you’ve sent always 
discuss a single barrier per subtask channel. Should I understand these metrics 
as a property of an operator and not of each subtask (at least for aligned 
checkpoints)? Then “first” and “last” would make sense to me: first/last across 
all subtasks/channels for a given operator.

Naturally, for unaligned checkpoints, alignment duration isn’t applicable, but 
what about Start Delay? I imagine that might indeed be a property of the 
subtask and not the operator.

With respect to my problem, I can also add that my job reads data from Pulsar, 
so some of it is buffered in the message bus. If I’m understanding the aligned 
checkpoint mechanism correctly, after the first failure the job restarts and 
tries to read, let’s say, the last 5 minutes of data. Then it fails again 
because the checkpoint times out and, after restarting, would it try to read, 
for example, 15 minutes of data? If there was no backpressure in the source, it 
could be that the new checkpoint barriers created after the first restart are 
behind more data than before it restarted, no?

Regards,
Alexis.

From: Piotr Nowojski <pnowoj...@apache.org>
Sent: Montag, 25. Oktober 2021 13:35
To: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
Cc: Parag Somani <somanipa...@gmail.com>; Caizhi Weng <tsreape...@gmail.com>; 
Flink ML <user@flink.apache.org>
Subject: Re: Troubleshooting checkpoint timeout

Hi again Alexis,

First answering your questions:
> 1. The documentation states “Operators that receive more than one input 
> stream need to align the input streams on the snapshot barriers”. If an 
> operator has parallelism > 1, does that count as more than one stream? Or is 
> there a single output barrier for all subtask outputs that gets “copied” to 
> all downstream subtask inputs?

Yes, in this context "more than one input" means more than one 
input/communication channels, for example from multiple upstream parallel 
operators. You can read about that in some blog posts [1] or presentations [2]

> 2. Similarly, alignment duration is said to be “The time between processing 
> the first and the last checkpoint barrier”. What exactly is the 
> interpretation of “first” and “last” here? Do they relate to a checkpoint “n” 
> where “first” would be the barrier for n-1 and “last” the one for n?

No. It means the difference between observing the first and last checkpoint 
barrier from the same checkpoint "n". If you are still not sure, I believe if 
you read/listen the resources I pointed out in the previous response (or google 
for "flink checkpointing") it should clear things out :)

> 3. Start delay also refers to the “first checkpoint barrier to reach this 
> subtask”. As before, what is “first” in this context?

Again, the same answer as above :) In short checkpoint barriers (for one given 
checkpoint "n"), are injected in the sources - in all parallel instances of the 
source operators. From there, they are traveling through the job graph to 
downstream operators. Once a downstream operator receives the first checkpoint 
barrier (from checkpoint "n"), the alignment phase begins. Note that if a 
downstream operator has parallelism of 100, it will most likely have 100 
network inputs, and it will be waiting for 100 different checkpoint barriers 
(one from each upstream operator) during the alignment phase. After receiving 
the 100th checkpoint barrier (still checkpoint "n"), the alignment phase 
completes. The difference between "aligned" and "unaligned" checkpoints comes 
down to when such operator broadcasts to it's outputs checkpoint barrier for 
the downstream operators. In aligned checkpoints such checkpoint barriers are 
broadcasted at the end of alignment phase. In unaligned checkpoints it happens 
after seeing the first checkpoint barrier.

> 4. Maybe this will be answered by the previous questions, but what happens to 
> barriers if a downstream operator has lower parallelism?

If there is a different number of input and output channels it doesn't matter. 
Once an operator decides to send/broadcast a checkpoint barrier downstream, it 
just broadcasts it to all output channels.

Coming back to your problem:

> There’s a good chance the problem begins when the job starts running out of 
> (heap) memory and the GC introduces delays. That’s of course independent of 
> Flink and I’ll have to look at the cause

This could easily explain an excessive back pressure. Note that if you have 
long GC pauses, it can affect accuracy of the metrics. Keep in mind that you 
can always attach a JVM profiler to the Flink's process to analyse easier 
what's happening there.

> I guess I was reading older versions of the documentation that didn’t have 
> that.

The documentation is evolving and we are trying to improve it over time and 
Flink's version that you are using is no longer supported/developed. Combining 
those two issues might lead to a situation where the documentation for the more 
recent Flink versions is better, while still being accurate for the most part. 
That's why I've linked you to those more fresh resources.

Best,
Piotrek


[1] 
https://flink.apache.org/2020/10/15/from-aligned-to-unaligned-checkpoints-part-1.html
[2] https://youtu.be/-ABPHMhrecQ


pon., 25 paź 2021 o 11:38 Alexis Sarda-Espinosa 
<alexis.sarda-espin...@microfocus.com<mailto:alexis.sarda-espin...@microfocus.com>>
 napisał(a):
Hi Piotrek,

Thanks for all the information, I guess I was reading older versions of the 
documentation that didn’t have that.

I was just using the job graph UI to check backpressure, but after looking at 
other factors, I think there is indeed some backpressure, but I don’t know how 
it builds up (there’s none at the beginning of the job). I can’t easily upgrade 
the Flink version just yet, so I don’t have access to all the new facilities 
but based on what I do have I have some additional remarks/questions.

There’s a good chance the problem begins when the job starts running out of 
(heap) memory and the GC introduces delays. That’s of course independent of 
Flink and I’ll have to look at the cause, but even if I increase available 
memory, I still see delays (at least for some time); I know this because one of 
my operators uses timers and logs their timestamps, and I can see the timer 
timestamps lagging clock time by up to 1 hour. Since the logs don’t indicate 
the operator’s logic takes a significant amount of time and CPU is far below 
the available limit (the single TM barely uses more than 1 CPU out of 4), I’d 
guess the lag could be related to checkpoint alignment, which takes me to my 
questions:


  1.  The documentation states “Operators that receive more than one input 
stream need to align the input streams on the snapshot barriers”. If an 
operator has parallelism > 1, does that count as more than one stream? Or is 
there a single output barrier for all subtask outputs that gets “copied” to all 
downstream subtask inputs?
  2.  Similarly, alignment duration is said to be “The time between processing 
the first and the last checkpoint barrier”. What exactly is the interpretation 
of “first” and “last” here? Do they relate to a checkpoint “n” where “first” 
would be the barrier for n-1 and “last” the one for n?
  3.  Start delay also refers to the “first checkpoint barrier to reach this 
subtask”. As before, what is “first” in this context?
  4.  Maybe this will be answered by the previous questions, but what happens 
to barriers if a downstream operator has lower parallelism?

Regards,
Alexis.

From: Piotr Nowojski <pnowoj...@apache.org<mailto:pnowoj...@apache.org>>
Sent: Montag, 25. Oktober 2021 09:59
To: Alexis Sarda-Espinosa 
<alexis.sarda-espin...@microfocus.com<mailto:alexis.sarda-espin...@microfocus.com>>
Cc: Parag Somani <somanipa...@gmail.com<mailto:somanipa...@gmail.com>>; Caizhi 
Weng <tsreape...@gmail.com<mailto:tsreape...@gmail.com>>; Flink ML 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Troubleshooting checkpoint timeout

Hi Alexis,

You can read about those metrics in the documentation [1]. Long alignment 
duration and start delay almost always come together. High values indicate long 
checkpoint barrier propagation times through the job graph, that's always (at 
least so far I haven't seen a different reason) caused by the same thing: 
backpressure. Which brings me to

> There is no backpressure in any operator.

Why do you think so?

For analysing backpressure I would highly recommend upgrading to Flink 1.13.x 
as it has greatly improved tooling for that [2]. Since Flink 1.10 I believe you 
can use the `isBackPressured` metric. In previous versions you would have to 
rely on buffer usage metrics as described here [3].

If this is indeed a problem with a backpressure, there are three things you 
could do to improve checkpointing time:
a) Reduce the backpressure, either by optimising your job/code or scaling up.
b) Reduce the amount of in-flight data. Since Flink 1.14.x, Flink can do it 
automatically when buffer debloating is enabled, but the same principle could 
be used to manually and statically configure cluster to have less in-flight 
data. You can read about this here [4].
c) Enabled unaligned checkpoints [5].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/
[2] https://flink.apache.org/2021/07/07/backpressure.html
[3] 
https://flink.apache.org/2019/07/23/flink-network-stack-2.html#network-metrics
[4] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/network_mem_tuning/#the-buffer-debloating-mechanism
[5] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints

Best,
Piotrek

czw., 21 paź 2021 o 19:00 Alexis Sarda-Espinosa 
<alexis.sarda-espin...@microfocus.com<mailto:alexis.sarda-espin...@microfocus.com>>
 napisał(a):
I would really appreciate more fine-grained information regarding the factors 
that can affect a checkpoint’s:


  *   Sync duration
  *   Async duration
  *   Alignment duration
  *   Start delay

Otherwise those metrics don’t really help me know in which areas to look for 
issues.

Regards,
Alexis.

From: Alexis Sarda-Espinosa 
<alexis.sarda-espin...@microfocus.com<mailto:alexis.sarda-espin...@microfocus.com>>
Sent: Mittwoch, 20. Oktober 2021 09:43
To: Parag Somani <somanipa...@gmail.com<mailto:somanipa...@gmail.com>>; Caizhi 
Weng <tsreape...@gmail.com<mailto:tsreape...@gmail.com>>
Cc: Flink ML <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: RE: Troubleshooting checkpoint timeout

Currently the windows are 10 minutes in size with a 1-minute slide time. The 
approximate 500 event/minute throughput is already rather high for my use case, 
so I don’t expect it to be higher, but I would imagine that’s still pretty low.

I did have some issues with storage space, and I wouldn’t be surprised if there 
is an IO bottleneck in my dev environment, but then my main question would be: 
if IO is being throttled, could that result in the high “start delay” times I 
observe? That seems to be the main slowdown, so I just want to be sure I’m 
looking in the right direction.

I’d like to mention another thing about my pipeline’s structure in case it’s 
relevant, although it may be completely unrelated. I said that I specify the 
windowing properties once (windowedStream in my 1st e-mail) and use it twice, 
but it’s actually used 3 times. In addition to the 2 ProcessWindowFunctions 
that end in sinks, the stream is also joined with a side output:

openedEventsTimestamped = openedEvents
        .getSideOutput(…)
        .keyBy(keySelector)
        .assignTimestampsAndWatermarks(watermarkStrategy)

windowedStream
        .process(ProcessWindowFunction3())
        .keyBy(keySelector)
        
.connect(DataStreamUtils.reinterpretAsKeyedStream(openedEventsTimestamped, 
keySelector))
        .process(...)

Could this lead to delays or alignment issues?

Regards,
Alexis.

From: Parag Somani <somanipa...@gmail.com<mailto:somanipa...@gmail.com>>
Sent: Mittwoch, 20. Oktober 2021 09:22
To: Caizhi Weng <tsreape...@gmail.com<mailto:tsreape...@gmail.com>>
Cc: Alexis Sarda-Espinosa 
<alexis.sarda-espin...@microfocus.com<mailto:alexis.sarda-espin...@microfocus.com>>;
 Flink ML <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Troubleshooting checkpoint timeout

I had similar problem, where i have concurrent two checkpoints were configured. 
Also, i used to save it in S3(using minio) on k8s 1.18 env.

Flink service were getting restarted and timeout was happening. It got resolved:
1. As minio ran out of disk space, caused failure of checkpoints(this was the 
main cause).
2. Added duration/interval of checkpoint parameter to address it
execution.checkpointing.max-concurrent-checkpoints and 
execution.checkpointing.min-pause
Details of same at:
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#checkpointing


On Wed, Oct 20, 2021 at 7:50 AM Caizhi Weng 
<tsreape...@gmail.com<mailto:tsreape...@gmail.com>> wrote:
Hi!

I see you're using sliding event time windows. What's the exact value of 
windowLengthMinutes and windowSlideTimeMinutes? If windowLengthMinutes is large 
and windowSlideTimeMinutes is small then each record may be assigned to a large 
number of windows as the pipeline proceeds, thus gradually slows down 
checkpointing and finally causes a timeout.

Alexis Sarda-Espinosa 
<alexis.sarda-espin...@microfocus.com<mailto:alexis.sarda-espin...@microfocus.com>>
 于2021年10月19日周二 下午7:29写道:
Hello everyone,

I am doing performance tests for one of our streaming applications and, after 
increasing the throughput a bit (~500 events per minute), it has started 
failing because checkpoints cannot be completed within 10 minutes. The Flink 
cluster is not exactly under my control and is running on Kubernetes with 
version 1.11.3 and RocksDB backend.

I can access the UI and logs and have confirmed:


  *   Logs do indicate expired checkpoints.
  *   There is no backpressure in any operator.
  *   When checkpoints do complete (seemingly at random):

     *   Size is 10-20MB.
     *   Sync and Async durations are at most 1-2 seconds.
     *   In one of the tasks, alignment takes 1-3 minutes, but start delays 
grow to up to 5 minutes.

  *   The aforementioned task (the one with 5-minute start delay) has 8 
sub-tasks and I see no indication of data skew. When the checkpoint times out, 
none of the sub-tasks have acknowledged the checkpoint.

The problematic task that is failing very often (and holding back downstream 
tasks) consists of the following operations:

timestampedEventStream = events
                .keyBy(keySelector)
                .assignTimestampsAndWatermarks(watermarkStrategy);

windowedStream = 
DataStreamUtils.reinterpretAsKeyedStream(timestampedEventStream, keySelector)
                .window(SlidingEventTimeWindows.of(
                        Time.minutes(windowLengthMinutes),
                        Time.minutes(windowSlideTimeMinutes)))
                .allowedLateness(Time.minutes(allowedLatenessMinutes));

windowedStream
                    .process(new ProcessWindowFunction1(config))
                    // add sink

windowedStream
                    .process(new ProcessWindowFunction2(config))
                    // add sink

Both window functions are using managed state, but nothing out of the ordinary 
(as mentioned above, state size is actually very small). Do note that the same 
windowedStream is used twice.

I don’t see any obvious runtime issues and I don’t think the load is 
particularly high, but maybe there’s something wrong in my pipeline definition? 
What else could cause these timeouts?

Regards,
Alexis.



--
Regards,
Parag Surajmal Somani.

Reply via email to