Hello,

Actually, I agree I do not need to have such an aggressive checkpoint
period for my jobs, so I increased the checkpoint period from 1 to 10s and
JobManager memory consumption is now quite stable for 3 days in my Flink
1.10.0 cluster.

Thanks a lot for your help.

Best regards,
Marc

Le ven. 10 avr. 2020 à 11:54, Till Rohrmann <trohrm...@apache.org> a écrit :

> What you could also try out is whether the same problem occurs with Flink
> 1.7.3. We did the executor change in this bug fix release. This could help
> us validating my suspicion.
>
> Cheers,
> Till
>
> On Thu, Apr 9, 2020 at 4:24 PM Till Rohrmann <trohrm...@apache.org> wrote:
>
>> For further reference, I've created this issue [1] to track the problem.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-17073
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 9, 2020 at 1:20 PM Yun Tang <myas...@live.com> wrote:
>>
>>> Hi Marc
>>>
>>> The left 'chk-X' folders, which should be discarded when removing
>>> checkpoint at the final stage, could also prove that those not discarded
>>> completed checkpoint meta occupied the memory.
>>>
>>> If we treat your average checkpoint meta size as 30KB, 20000
>>> not-discarded complete checkpoints would occupy about 585MB memory, which
>>> is close to your observed scenario.
>>>
>>> From my point of view, the checkpoint interval of one second is really
>>> too often and would not make much sense in production environment.
>>>
>>> Best
>>> Yun Tang
>>> ------------------------------
>>> *From:* Till Rohrmann <trohrm...@apache.org>
>>> *Sent:* Thursday, April 9, 2020 17:41
>>> *To:* Marc LEGER <maleger...@gmail.com>
>>> *Cc:* Yun Tang <myas...@live.com>; user@flink.apache.org <
>>> user@flink.apache.org>
>>> *Subject:* Re: Possible memory leak in JobManager (Flink 1.10.0)?
>>>
>>> Thanks for reporting this issue Marc. From what you've reported, I think
>>> Yun is right and that the large memory footprint is caused by
>>> CompletedCheckpoints which cannot be removed fast enough. One way to verify
>>> this is to enable TRACE logging because then Flink will log for every
>>> CompletedCheckpoint when it gets discarded. The line should look like this
>>> "Executing discard procedure for Checkpoint". The high number of chk-X
>>> folders on S3 could be the result of the slow discard operations.
>>>
>>> If you want then we can also take a look at the logs and ideally also
>>> the heap dump if you can share them with us.
>>>
>>> I think one difference between Flink 1.10.0 and 1.7.2 is that we are
>>> using a fixed thread pool for running the io operations in 1.10.0. The
>>> number of threads equals the number of cores. In contrast, in Flink 1.7.2
>>> we used a fork join pool with a max parallelism of 64. This difference
>>> could explain the lower throughput of discard operations because fewer can
>>> happen in parallel.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Apr 9, 2020 at 10:09 AM Marc LEGER <maleger...@gmail.com> wrote:
>>>
>>> Hello Yun,
>>>
>>> Thank you for your feedback, please find below my answers to your
>>> questions:
>>>
>>> 1. I am using incremental state checkpointing with RocksDB backend and
>>> AWS S3 as a distributed file system, everything is configured in
>>> flink-conf.yaml as follows:
>>>
>>> state.backend: rocksdb
>>> state.backend.incremental: true
>>> # placeholders are replaced at deploy time
>>> state.checkpoints.dir: s3://#S3_BUCKET#/#SERVICE_ID#/flink/checkpoints
>>> state.backend.rocksdb.localdir: /home/data/flink/rocksdb
>>>
>>> Size of _metdata file in a checkpoint folder for the 3 running jobs:
>>> - job1: 64KB
>>> - job2: 1K
>>> - job3: 10K
>>>
>>> By the way, I have between 10000 and 20000 "chk-X" folders per job in S3.
>>>
>>> 2. Checkpointing is configured to be triggered every second for all the
>>> jobs. Only the interval is set, otherwise everything is kept as default:
>>>
>>> executionEnvironment.enableCheckpointing(1000);
>>>
>>> Best Regards,
>>> Marc
>>>
>>> Le mer. 8 avr. 2020 à 20:48, Yun Tang <myas...@live.com> a écrit :
>>>
>>> Hi Marc
>>>
>>> I think the occupied memory is due to the to-remove complete checkpoints
>>> which are stored in the workQueue of io-executor [1] in
>>> ZooKeeperCompletedCheckpointStore [2]. One clue to prove this is that
>>> Executors#newFixedThreadPool would create a ThreadPoolExecutor with a
>>> LinkedBlockingQueue to store runnables.
>>>
>>> To figure out the root cause, would you please check the information
>>> below:
>>>
>>>    1. How large of your checkpoint meta, you could view
>>>    {checkpoint-dir}/chk-X/_metadata to know the size, you could provide what
>>>    state backend you use to help know this.
>>>    2. What is the interval of your checkpoints, a smaller checkpoint
>>>    interval might accumulate many completed checkpoints to subsume once a
>>>    newer checkpoint completes.
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L260
>>> [2]
>>> https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L234
>>>
>>> Best
>>> Yun Tang
>>>
>>> ------------------------------
>>> *From:* Marc LEGER <maleger...@gmail.com>
>>> *Sent:* Wednesday, April 8, 2020 16:50
>>> *To:* user@flink.apache.org <user@flink.apache.org>
>>> *Subject:* Possible memory leak in JobManager (Flink 1.10.0)?
>>>
>>> Hello,
>>>
>>> I am currently testing Flink 1.10.0 but I am facing memory issues with
>>> JobManagers deployed in a standalone cluster configured in HA mode with 3
>>> TaskManagers (and 3 running jobs).
>>> I do not reproduce the same issues using Flink 1.7.2.
>>>
>>> Basically, whatever the value of "jobmanager.heap.size" property is (I
>>> tried with 2 GB, then 4GB and finally 8GB), the leader JobManager process
>>> is eventually consuming all available memory and is hanging after a few
>>> hours or days (depending on the size of the heap) before being deassociated
>>> from the cluster.
>>>
>>> I am using OpenJ9 JVM with Java 11 on CentOS 7.6 machines:
>>> openjdk version "11.0.6" 2020-01-14
>>> OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.6+10)
>>> Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.18.1, JRE 11 Linux
>>> amd64-64-Bit Compressed
>>>
>>> I performed a heap dump for analysis on the JobManager Java process and
>>> generated a "Leak Suspects" report using Eclipse MAT.
>>> The tool is detecting one main suspect (cf. attached screenshots):
>>>
>>> One instance of "java.util.concurrent.ThreadPoolExecutor" loaded by
>>> "<system class loader>" occupies 580,468,280 (92.82%) bytes. The instance
>>> is referenced by
>>> org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices @
>>> 0x8041fb48 , loaded by "<system class loader>".
>>>
>>> Has anyone already faced such an issue ?
>>>
>>> Best Regards,
>>> Marc
>>>
>>>

Reply via email to