Hi Robin

Glad to hear that my reply can help.

>From my side, I do not think concurrent checkpoints can help, because it
may cause more disk pressure problems.

Currently, this is an issue[1] wants to support Unalign checkpoint,
unaligned checkpoint wants to fix the problem of checkpoint under
backpressure

[1] https://issues.apache.org/jira/browse/FLINK-14551
Best,
Congxian


Robin Cassan <robin.cas...@contentsquare.com> 于2020年4月9日周四 下午8:30写道:

> Hello again Congxian,
>
> Thank you so much for your advice, it is really helpful! We have managed
> to pinpoint that most of our problems occur because of disk pressure, most
> likely due to the usage of EBS, we will try again with local SSDs.
> Digging deeper into the "snowball effect on incremental checkpoint
> timeouts", I am wondering two things:
>
> - Would it help to use Concurrent Checkpointing? In our current tests,
> Flink waits for the previous checkpoint to finish before starting the next
> one. So if the previous one has expired, the next one will be twice as big.
> But if we enable concurrent checkpoints, is it correct to assume that the
> second checkpoint that the checkpoints sizes should be more consistent?
> More precisely, if a second checkpoint triggers during the first
> checkpoint, this will fix the size of the first checkpoint because new
> barriers are injected, and if the first checkpoint expires it would be
> retried with the same amount of data?
>
> - I am also wondering if there is a way for long checkpoints to create
> backpressure on the rest of the stream? This would be a nice feature to
> have, since it would avoid the state growing too much when checkpointing
> takes time because of temporary network issues for example.
>
> Thanks for your help!
> Robin
>
> Le mer. 8 avr. 2020 à 05:30, Congxian Qiu <qcx978132...@gmail.com> a
> écrit :
>
>> Hi Robin
>> Thanks for the detailed reply, and sorry for my late reply.
>> I think that your request to fail the whole job when continues checkpoint
>> expired is valid, I've created an issue to track this[1]
>>
>> For now, maybe the following steps can help you find out the reason of
>> time out
>>
>> 1. You can find out the "not ack subtask" in checkpoint ui, (maybe it
>> called A)
>> 2. find out A is under backpressure now?
>> 2.1. if A is under backpressure, please fix it
>> 2.2 if A is not under backpressure, you can go to the tm log of A to find
>> out something abnormal(maybe you need to enable the debug log in this step)
>>
>> for the snapshot in TM side, it contains 1) barrier align (exactly-once
>> mode, at least once no need to align the barrier); 2) synchronize
>> procedure; 3)asynchronize procedure;
>>
>> backpressure will affect step 1, too many timers/cpu consumption too
>> high/disk utilization too high may affect step 2; 3) disk
>> performance/network performance may affect step 3;
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-17043
>> Best,
>> Congxian
>>
>>
>> Robin Cassan <robin.cas...@contentsquare.com> 于2020年4月3日周五 下午8:35写道:
>>
>>> Hi Congxian,
>>>
>>> Thanks for confirming! The reason I want this behavior is because we are
>>> currently investigating issues with checkpoints that keep getting timeouts
>>> after the job has been running for a few hours. We observed that, after a
>>> few timeouts, if the job was being restarted because of a lost TM for
>>> example, the next checkpoints would be working for a few more hours.
>>> However, if the job continues running and consuming more data, the next
>>> checkpoints will be even bigger and the chances of them completing in time
>>> are getting even thinner.
>>> Crashing the job is not a viable solution I agree, but it would allow us
>>> to generate data during the time we investigate the root cause of the
>>> timeouts.
>>>
>>> I believe that having the option to make the job restart after a few
>>> checkpoint timeouts would still help to avoid the snowball effect of
>>> incremental checkpoints being bigger and bigger if the checkpoints keep
>>> getting expired.
>>>
>>> I'd love to get your opinion on this!
>>>
>>> Thanks,
>>> Robin
>>>
>>> Le ven. 3 avr. 2020 à 11:17, Congxian Qiu <qcx978132...@gmail.com> a
>>> écrit :
>>>
>>>> Currently, only checkpoint declined will be counted into
>>>> `continuousFailureCounter`.
>>>> Could you please share why do you want the job to fail when checkpoint
>>>> expired?
>>>>
>>>> Best,
>>>> Congxian
>>>>
>>>>
>>>> Timo Walther <twal...@apache.org> 于2020年4月2日周四 下午11:23写道:
>>>>
>>>>> Hi Robin,
>>>>>
>>>>> this is a very good observation and maybe even unintended behavior.
>>>>> Maybe Arvid in CC is more familiar with the checkpointing?
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>>
>>>>> On 02.04.20 15:37, Robin Cassan wrote:
>>>>> > Hi all,
>>>>> >
>>>>> > I am wondering if there is a way to make a flink job fail (not
>>>>> cancel
>>>>> > it) when one or several checkpoints have failed due to being expired
>>>>> > (taking longer than the timeout) ?
>>>>> > I am using Flink 1.9.2 and have set
>>>>> > `*setTolerableCheckpointFailureNumber(1)*` which doesn't do the
>>>>> trick.
>>>>> > Looking into the CheckpointFailureManager.java class, it looks like
>>>>> this
>>>>> > only works when the checkpoint failure reason is
>>>>> > `*CHECKPOINT_DECLINED*`, but the number of failures isn't
>>>>> incremented on
>>>>> > `*CHECKPOINT_EXPIRED*`.
>>>>> > Am I missing something?
>>>>> >
>>>>> > Thanks!
>>>>>
>>>>>

Reply via email to