Hi,
Do you mean your checkpoint failure stops the normal running of your job?
What's your sink type? If it relies on the completed checkpoint to commit,
it should be expected.

On Tue, Oct 31, 2023 at 12:03 AM Evgeniy Lyutikov <eblyuti...@avito.ru>
wrote:

> Hi team!
> I came across strange behavior in Flink 1.17.1. If during the build of a
> checkpoint the s3 storage becomes unavailable, then the current checkpoint 
> expired
> by timeout and new ones are not triggered.
> The triggering for new checkpoints is resumed only after s3 is restored
> and this can be after a long time.
>
> I can reproduce it, wait checkpoint and after start disconnect s3 storage
>
> 2023-10-27 09:48:11,866 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Triggering checkpoint 2504 (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1698400091851 for job
> 00000000000000000000000000000000.
> 2023-10-27 09:58:12,873 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Checkpoint 2504 of job 00000000000000000000000000000000 expired before
> completing.
> 2023-10-27 09:58:12,874 WARN
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to
> trigger or complete checkpoint 2504 for job
> 00000000000000000000000000000000. (0 consecutive failed attempts so far)
>
> after current checkpoint is expired (our timeout 10 min) no new triggering
> attempt in logs until restore s3 storage
>
> 2023-10-27 10:42:09,530 WARN
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle [] - Could
> not properly discard misc file states.
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Read
> timed out
> 2023-10-27 10:42:13,305 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Triggering checkpoint 2505 (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1698400691875 for job
> 00000000000000000000000000000000.
> 2023-10-27 10:42:39,287 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed
> checkpoint 2505 for job 00000000000000000000000000000000 (10023840497
> bytes, checkpointDuration=2666106 ms, finalizationTime=1306 ms).
> 2023-10-27 10:44:39,288 INFO
> org.apache.flink.runtime.checkpoint.CheckpointRequestDecider [] -
> checkpoint request time in queue: 1887436
> 2023-10-27 10:44:39,300 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Triggering checkpoint 2506 (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1698403479288 for job
> 00000000000000000000000000000000.
> 2023-10-27 10:44:50,924 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed
> checkpoint 2506 for job 00000000000000000000000000000000 (10085877149
> bytes, checkpointDuration=11011 ms, finalizationTime=625 ms).
> 2023-10-27 10:46:50,924 INFO
> org.apache.flink.runtime.checkpoint.CheckpointRequestDecider [] -
> checkpoint request time in queue: 1119073
>
> taskmanager logs on restore s3 storage
>
> 2023-10-27 10:42:13,302 DEBUG
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] -
> Cleanup AsyncCheckpointRunnable for checkpoint 2504 of Process ...
> 2023-10-27 10:42:13,302 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Notify
> checkpoint 2503 complete on task ...
> 2023-10-27 10:42:13,302 DEBUG
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl
> [] - Notification of checkpoint ABORT 2504 for task ...
>
> It looks like everything hangs on requests for the state of objects in s3
> storage (repeated HEAD requests with full object path in s3 storage).
> Sometimes it was observed that job completely stops working (no consuming
> and producing) until the s3 storage is restored
> Is this expected behavior?
>
> P.S. If a storage failure occurs before the start of checkpoint assembly,
> then everything works as expected, new checkpoints are triggered every
> confugured interval and expire after 10 min.
>
>
>
>
> * ------------------------------ *“This message contains confidential
> information/commercial secret. If you are not the intended addressee of
> this message you may not copy, save, print or forward it to any third party
> and you are kindly requested to destroy this message and notify the sender
> thereof by email.
> Данное сообщение содержит конфиденциальную информацию/информацию,
> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
> данного сообщения, Вы не вправе копировать, сохранять, печатать или
> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
> уведомить об этом отправителя электронным письмом.”
>


-- 
Best,
Hangxiang.

Reply via email to