Hello, Actually, I agree I do not need to have such an aggressive checkpoint period for my jobs, so I increased the checkpoint period from 1 to 10s and JobManager memory consumption is now quite stable for 3 days in my Flink 1.10.0 cluster.
Thanks a lot for your help. Best regards, Marc Le ven. 10 avr. 2020 à 11:54, Till Rohrmann <trohrm...@apache.org> a écrit : > What you could also try out is whether the same problem occurs with Flink > 1.7.3. We did the executor change in this bug fix release. This could help > us validating my suspicion. > > Cheers, > Till > > On Thu, Apr 9, 2020 at 4:24 PM Till Rohrmann <trohrm...@apache.org> wrote: > >> For further reference, I've created this issue [1] to track the problem. >> >> [1] https://issues.apache.org/jira/browse/FLINK-17073 >> >> Cheers, >> Till >> >> On Thu, Apr 9, 2020 at 1:20 PM Yun Tang <myas...@live.com> wrote: >> >>> Hi Marc >>> >>> The left 'chk-X' folders, which should be discarded when removing >>> checkpoint at the final stage, could also prove that those not discarded >>> completed checkpoint meta occupied the memory. >>> >>> If we treat your average checkpoint meta size as 30KB, 20000 >>> not-discarded complete checkpoints would occupy about 585MB memory, which >>> is close to your observed scenario. >>> >>> From my point of view, the checkpoint interval of one second is really >>> too often and would not make much sense in production environment. >>> >>> Best >>> Yun Tang >>> ------------------------------ >>> *From:* Till Rohrmann <trohrm...@apache.org> >>> *Sent:* Thursday, April 9, 2020 17:41 >>> *To:* Marc LEGER <maleger...@gmail.com> >>> *Cc:* Yun Tang <myas...@live.com>; user@flink.apache.org < >>> user@flink.apache.org> >>> *Subject:* Re: Possible memory leak in JobManager (Flink 1.10.0)? >>> >>> Thanks for reporting this issue Marc. From what you've reported, I think >>> Yun is right and that the large memory footprint is caused by >>> CompletedCheckpoints which cannot be removed fast enough. One way to verify >>> this is to enable TRACE logging because then Flink will log for every >>> CompletedCheckpoint when it gets discarded. The line should look like this >>> "Executing discard procedure for Checkpoint". The high number of chk-X >>> folders on S3 could be the result of the slow discard operations. >>> >>> If you want then we can also take a look at the logs and ideally also >>> the heap dump if you can share them with us. >>> >>> I think one difference between Flink 1.10.0 and 1.7.2 is that we are >>> using a fixed thread pool for running the io operations in 1.10.0. The >>> number of threads equals the number of cores. In contrast, in Flink 1.7.2 >>> we used a fork join pool with a max parallelism of 64. This difference >>> could explain the lower throughput of discard operations because fewer can >>> happen in parallel. >>> >>> Cheers, >>> Till >>> >>> On Thu, Apr 9, 2020 at 10:09 AM Marc LEGER <maleger...@gmail.com> wrote: >>> >>> Hello Yun, >>> >>> Thank you for your feedback, please find below my answers to your >>> questions: >>> >>> 1. I am using incremental state checkpointing with RocksDB backend and >>> AWS S3 as a distributed file system, everything is configured in >>> flink-conf.yaml as follows: >>> >>> state.backend: rocksdb >>> state.backend.incremental: true >>> # placeholders are replaced at deploy time >>> state.checkpoints.dir: s3://#S3_BUCKET#/#SERVICE_ID#/flink/checkpoints >>> state.backend.rocksdb.localdir: /home/data/flink/rocksdb >>> >>> Size of _metdata file in a checkpoint folder for the 3 running jobs: >>> - job1: 64KB >>> - job2: 1K >>> - job3: 10K >>> >>> By the way, I have between 10000 and 20000 "chk-X" folders per job in S3. >>> >>> 2. Checkpointing is configured to be triggered every second for all the >>> jobs. Only the interval is set, otherwise everything is kept as default: >>> >>> executionEnvironment.enableCheckpointing(1000); >>> >>> Best Regards, >>> Marc >>> >>> Le mer. 8 avr. 2020 à 20:48, Yun Tang <myas...@live.com> a écrit : >>> >>> Hi Marc >>> >>> I think the occupied memory is due to the to-remove complete checkpoints >>> which are stored in the workQueue of io-executor [1] in >>> ZooKeeperCompletedCheckpointStore [2]. One clue to prove this is that >>> Executors#newFixedThreadPool would create a ThreadPoolExecutor with a >>> LinkedBlockingQueue to store runnables. >>> >>> To figure out the root cause, would you please check the information >>> below: >>> >>> 1. How large of your checkpoint meta, you could view >>> {checkpoint-dir}/chk-X/_metadata to know the size, you could provide what >>> state backend you use to help know this. >>> 2. What is the interval of your checkpoints, a smaller checkpoint >>> interval might accumulate many completed checkpoints to subsume once a >>> newer checkpoint completes. >>> >>> >>> [1] >>> https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L260 >>> [2] >>> https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L234 >>> >>> Best >>> Yun Tang >>> >>> ------------------------------ >>> *From:* Marc LEGER <maleger...@gmail.com> >>> *Sent:* Wednesday, April 8, 2020 16:50 >>> *To:* user@flink.apache.org <user@flink.apache.org> >>> *Subject:* Possible memory leak in JobManager (Flink 1.10.0)? >>> >>> Hello, >>> >>> I am currently testing Flink 1.10.0 but I am facing memory issues with >>> JobManagers deployed in a standalone cluster configured in HA mode with 3 >>> TaskManagers (and 3 running jobs). >>> I do not reproduce the same issues using Flink 1.7.2. >>> >>> Basically, whatever the value of "jobmanager.heap.size" property is (I >>> tried with 2 GB, then 4GB and finally 8GB), the leader JobManager process >>> is eventually consuming all available memory and is hanging after a few >>> hours or days (depending on the size of the heap) before being deassociated >>> from the cluster. >>> >>> I am using OpenJ9 JVM with Java 11 on CentOS 7.6 machines: >>> openjdk version "11.0.6" 2020-01-14 >>> OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.6+10) >>> Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.18.1, JRE 11 Linux >>> amd64-64-Bit Compressed >>> >>> I performed a heap dump for analysis on the JobManager Java process and >>> generated a "Leak Suspects" report using Eclipse MAT. >>> The tool is detecting one main suspect (cf. attached screenshots): >>> >>> One instance of "java.util.concurrent.ThreadPoolExecutor" loaded by >>> "<system class loader>" occupies 580,468,280 (92.82%) bytes. The instance >>> is referenced by >>> org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices @ >>> 0x8041fb48 , loaded by "<system class loader>". >>> >>> Has anyone already faced such an issue ? >>> >>> Best Regards, >>> Marc >>> >>>