Hi Robin Glad to hear that my reply can help.
>From my side, I do not think concurrent checkpoints can help, because it may cause more disk pressure problems. Currently, this is an issue[1] wants to support Unalign checkpoint, unaligned checkpoint wants to fix the problem of checkpoint under backpressure [1] https://issues.apache.org/jira/browse/FLINK-14551 Best, Congxian Robin Cassan <robin.cas...@contentsquare.com> 于2020年4月9日周四 下午8:30写道: > Hello again Congxian, > > Thank you so much for your advice, it is really helpful! We have managed > to pinpoint that most of our problems occur because of disk pressure, most > likely due to the usage of EBS, we will try again with local SSDs. > Digging deeper into the "snowball effect on incremental checkpoint > timeouts", I am wondering two things: > > - Would it help to use Concurrent Checkpointing? In our current tests, > Flink waits for the previous checkpoint to finish before starting the next > one. So if the previous one has expired, the next one will be twice as big. > But if we enable concurrent checkpoints, is it correct to assume that the > second checkpoint that the checkpoints sizes should be more consistent? > More precisely, if a second checkpoint triggers during the first > checkpoint, this will fix the size of the first checkpoint because new > barriers are injected, and if the first checkpoint expires it would be > retried with the same amount of data? > > - I am also wondering if there is a way for long checkpoints to create > backpressure on the rest of the stream? This would be a nice feature to > have, since it would avoid the state growing too much when checkpointing > takes time because of temporary network issues for example. > > Thanks for your help! > Robin > > Le mer. 8 avr. 2020 à 05:30, Congxian Qiu <qcx978132...@gmail.com> a > écrit : > >> Hi Robin >> Thanks for the detailed reply, and sorry for my late reply. >> I think that your request to fail the whole job when continues checkpoint >> expired is valid, I've created an issue to track this[1] >> >> For now, maybe the following steps can help you find out the reason of >> time out >> >> 1. You can find out the "not ack subtask" in checkpoint ui, (maybe it >> called A) >> 2. find out A is under backpressure now? >> 2.1. if A is under backpressure, please fix it >> 2.2 if A is not under backpressure, you can go to the tm log of A to find >> out something abnormal(maybe you need to enable the debug log in this step) >> >> for the snapshot in TM side, it contains 1) barrier align (exactly-once >> mode, at least once no need to align the barrier); 2) synchronize >> procedure; 3)asynchronize procedure; >> >> backpressure will affect step 1, too many timers/cpu consumption too >> high/disk utilization too high may affect step 2; 3) disk >> performance/network performance may affect step 3; >> >> [1] https://issues.apache.org/jira/browse/FLINK-17043 >> Best, >> Congxian >> >> >> Robin Cassan <robin.cas...@contentsquare.com> 于2020年4月3日周五 下午8:35写道: >> >>> Hi Congxian, >>> >>> Thanks for confirming! The reason I want this behavior is because we are >>> currently investigating issues with checkpoints that keep getting timeouts >>> after the job has been running for a few hours. We observed that, after a >>> few timeouts, if the job was being restarted because of a lost TM for >>> example, the next checkpoints would be working for a few more hours. >>> However, if the job continues running and consuming more data, the next >>> checkpoints will be even bigger and the chances of them completing in time >>> are getting even thinner. >>> Crashing the job is not a viable solution I agree, but it would allow us >>> to generate data during the time we investigate the root cause of the >>> timeouts. >>> >>> I believe that having the option to make the job restart after a few >>> checkpoint timeouts would still help to avoid the snowball effect of >>> incremental checkpoints being bigger and bigger if the checkpoints keep >>> getting expired. >>> >>> I'd love to get your opinion on this! >>> >>> Thanks, >>> Robin >>> >>> Le ven. 3 avr. 2020 à 11:17, Congxian Qiu <qcx978132...@gmail.com> a >>> écrit : >>> >>>> Currently, only checkpoint declined will be counted into >>>> `continuousFailureCounter`. >>>> Could you please share why do you want the job to fail when checkpoint >>>> expired? >>>> >>>> Best, >>>> Congxian >>>> >>>> >>>> Timo Walther <twal...@apache.org> 于2020年4月2日周四 下午11:23写道: >>>> >>>>> Hi Robin, >>>>> >>>>> this is a very good observation and maybe even unintended behavior. >>>>> Maybe Arvid in CC is more familiar with the checkpointing? >>>>> >>>>> Regards, >>>>> Timo >>>>> >>>>> >>>>> On 02.04.20 15:37, Robin Cassan wrote: >>>>> > Hi all, >>>>> > >>>>> > I am wondering if there is a way to make a flink job fail (not >>>>> cancel >>>>> > it) when one or several checkpoints have failed due to being expired >>>>> > (taking longer than the timeout) ? >>>>> > I am using Flink 1.9.2 and have set >>>>> > `*setTolerableCheckpointFailureNumber(1)*` which doesn't do the >>>>> trick. >>>>> > Looking into the CheckpointFailureManager.java class, it looks like >>>>> this >>>>> > only works when the checkpoint failure reason is >>>>> > `*CHECKPOINT_DECLINED*`, but the number of failures isn't >>>>> incremented on >>>>> > `*CHECKPOINT_EXPIRED*`. >>>>> > Am I missing something? >>>>> > >>>>> > Thanks! >>>>> >>>>>