Good morning Daniel, Another reason could be backpressure with aligned checkpoints:
* Flink processes checkpoints by sending checkpoint markers through the job graph, beginning with source operators towards the sink operators * These checkpoint markers are sort of a meta event that is sent along you custom events (much like watermarks and latency markers) * These checkpoint markers cannot pass by (i.e. go faster than) your custom events * In your situation, because it happen right after you start the job, * it might be a source that forwards many events (e.g. for backfilling) while a later operator cannot process these events in the same speed * therefore the events queue in front of that operator as well as the checkpoint markers which consequently have a hard time to align event for longer than the checkpoint timeout * how to fix this situation: * diagnostics: Flink dashboard has a tab for checkpoints that show how long checkpoint progress and alignment take for each task/subtask * which version of Flink are you using? * Depending on the version of Flink you can enable unaligned checkpoints (having some other implications) * You could also increase scale out factor for the backfill phase and then lower it again … * FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold: this depends on what recovery strategy you have configured … I might be mistaken, however this is what I look into when I run into similar situations Feel free to get back to the mailing list for further clarifications … Thias From: Caizhi Weng <tsreape...@gmail.com> Sent: Donnerstag, 2. September 2021 04:24 To: Daniel Vol <vold...@gmail.com> Cc: user <user@flink.apache.org> Subject: Re: Flink restarts on Checkpoint failure Hi! There are a ton of possible reasons for a checkpoint failure. The most possible reasons might be * The JVM is busy with garbage collecting when performing the checkpoints. This can be checked by looking into the GC logs of a task manager. * The state suddenly becomes quite large due to some specific data pattern. This can be checked by looking at the state size for the completed portion of that checkpoint. You might also want to profile the CPU usage when the checkpoint is happening. Daniel Vol <vold...@gmail.com<mailto:vold...@gmail.com>> 于2021年9月1日周三 下午7:08写道: Hello, I see the following error in my jobmanager log (Flink on EMR): Checking cluster logs I see : 2021-08-21 17:17:30,489 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 (type=CHECKPOINT) @ 1629566250303 for job c513e9ebbea4ab72d80b1338896ca5c2. 2021-08-21 17:17:33,572 [jobmanager-future-thread-5] INFO com.amazon.ws<http://com.amazon.ws/>.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close closed:false s3://***/_metadata 2021-08-21 17:17:33,800 [jobmanager-future-thread-5] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 for job c513e9ebbea4ab72d80b1338896ca5c2 (737859873 bytes in 3496 ms). 2021-08-21 17:27:30,474 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 (type=CHECKPOINT) @ 1629566850302 for job c513e9ebbea4ab72d80b1338896ca5c2. 2021-08-21 17:27:46,012 [jobmanager-future-thread-3] INFO com.amazon.ws<http://com.amazon.ws/>.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close closed:false s3://***/_metadata 2021-08-21 17:27:46,158 [jobmanager-future-thread-3] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job c513e9ebbea4ab72d80b1338896ca5c2 (1210889410 bytes in 15856 ms). 2021-08-21 17:37:30,468 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 3 (type=CHECKPOINT) @ 1629567450302 for job c513e9ebbea4ab72d80b1338896ca5c2. 2021-08-21 17:47:30,469 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 3 of job c513e9ebbea4ab72d80b1338896ca5c2 expired before completing. 2021-08-21 17:47:30,476 [flink-akka.actor.default-dispatcher-34] INFO org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from a global failure. org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2021-08-21 17:47:30,478 [flink-akka.actor.default-dispatcher-34] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job session-aggregation (c513e9ebbea4ab72d80b1338896ca5c2) switched from state RUNNING to RESTARTING. Configuration is: -yD "execution.checkpointing.timeout=10 min"\ -yD "restart-strategy=failure-rate"\ -yD "restart-strategy.failure-rate.max-failures-per-interval=70"\ -yD "restart-strategy.failure-rate.delay=1 min"\ -yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\ Not sure this - https://issues.apache.org/jira/browse/FLINK-21215 is related - but it looks like it is solved. I know I can increase checkpoint timeout - but checkpoint size is relatively small and most of the time it takes several seconds to complete so 10 minutes should be more than enough. So the main question is why "Exceeded checkpoint tolerable failure threshold" triggered? Thanks! Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.