Hi James, From reading the thread … I assume, your file:/tmp/Flink/State folder is not shared across all machines, right?
In this case it cannot work: - checkpoints and savepoints need to go to a path that can be commonly accessed by jobmanager and all taskmanagers in order to work - as your jobmanager can not access the checkpoint files of it can also not clean-up those files Hope that helps Regards Thias From: James Sandys-Lumsdaine <jas...@hotmail.com> Sent: Tuesday, May 17, 2022 3:55 PM To: Hangxiang Yu <master...@gmail.com>; user@flink.apache.org Subject: Re: Checkpoint directories not cleared as TaskManagers run Thanks for your replay. To be clear on my setup with the problem: * 4 taskmanagers running across different containers and machines. Each container has its own filesystem including / and /tmp. * 1 jobmanager also running in its own container and machine. Also has its own filesystem. * I have configured the FS checkpoint address to be "file:/tmp/Flink/State" - therefore each process (JM and TMs) are reading and writing to their own /tmp. i.e. there is no shared access like if it was NFS or HDFS. So when the checkpointing happens the directories are created and populated but only the JM's old checkpoint directories and cleaned up. Each of the TM /tmp/Flink/State old "chk-x" directories remain and are not cleared up. From your email I don't know if you think I am writing to a "shared" path or not? I started looking at the in memory checkpoint storage but this has a max size with an int so can't have for 5GB of state. I need the checkpointing to trigger my sinks to persist (GenericWriteAheadSink) so it seem I have to create a proper shared file path all my containers can access. James. ________________________________ From: Hangxiang Yu <master...@gmail.com<mailto:master...@gmail.com>> Sent: 17 May 2022 14:38 To: James Sandys-Lumsdaine <jas...@hotmail.com<mailto:jas...@hotmail.com>>; user@flink.apache.org<mailto:user@flink.apache.org> <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Checkpoint directories not cleared as TaskManagers run Hi, James. I may not get what the problem is. All checkpoints will store in the address as you set. IIUC, TMs will write some checkpoint info in their local dir and then upload them to the address and then delete local one. JM will write some metas of checkpoint to the address and also do the entire deletion for checkpoints. Best, Hangxiang. On Tue, May 17, 2022 at 9:09 PM James Sandys-Lumsdaine <jas...@hotmail.com<mailto:jas...@hotmail.com>> wrote: Some further Googling says on a StackOverflow posting it is the jobmanager that does the deletion and not the taskmanagers. Currently my taskmanagers are writing their checkpoints to their own private disks (/tmp) rather than a share - so my suspicion is the jobmanager can't access the folder on other machine. I thought the jobmanagers could clear up their own state when instructed to by the jobmanager. I can not yet use an nfs mount in my deployment so I may have to switch to heap checkpoint state instead of using the file storage checkpoint system. Now I understand what's going on a bit better it seems pointless for me to have file checkpoints that can't be read by the jobmanager for failover. If anyone can clarify/correct me I would appreciate. James. ________________________________ From: James Sandys-Lumsdaine Sent: 16 May 2022 18:52 To: user@flink.apache.org<mailto:user@flink.apache.org> <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Checkpoint directories not cleared as TaskManagers run Hello, I'm seeing my Flink deployment's checkpoint storage directories build up and never clear down. When I run from my own IDE, I see the only the latest "chk-x" directory under the job id folder. So the first checkpoint is "chk-1", which is then replaced with "chk-2" etc. However, when I run as a proper application mode deployment, each of the 4 taskmanagers running in their own containers retain every one of the "chk-x" directories meaning they eat a lot of disk space after as time progresses. Interestingly, the jobmanager itself is fine. Does anyone have any suggestion on how to debug this? Anything obvious that would cause such behaviour? I'm currently using Flink 1.14.0. My set up is essentially below (trimmed for simplicity): Configuration conf = new Configuration(); conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.enableCheckpointing(5 * 1000); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000); env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage("file:/tmp/Flink/State"); Thanks in advance, James. Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.