Hi neha, 1. You can set the path of jemalloc into LD_LIBRARY_PATH of YARN[1], and here is a blog post about "RocksDB Memory Usage"[2]. 2. The default value of cleanupInRocksdbCompactFilter is 1000[3], maybe another value can be set according to the TPS of the job. The value of `state.backend.rocksdb.metrics.num-running-compactions`[4] may be affected by the sampling frequency of metrics, is the value of `state.backend.rocksdb.metrics.compaction-read-bytes` also zero?
[1] https://data-flair.training/forums/topic/how-to-include-native-libraries-in-yarn-jobs/ [2] https://shopify.engineering/optimizing-apache-flink-applications-tips [3] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#cleanup-during-rocksdb-compaction [4] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rocksdb-native-metrics neha goyal <nehagoy...@gmail.com> 于2023年7月4日周二 17:39写道: > > Hello Yanfei and Shammon, > > I have two additional questions. The links mentioned in the reply talk about > using jemalloc in a Docker image, but I am using Yarn on AWS EMR. How can I > use jemalloc with Yarn? Any references you can provide would be greatly > appreciated. > > StateTtlConfig cityTtlConfig = StateTtlConfig > > .newBuilder(org.apache.flink.api.common.time.Time.hours(longerLookbackHours)) > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build(); > MapStateDescriptor<String, Integer> cityListDescritpor = new > MapStateDescriptor<String, Integer>("cityList", String.class, Integer.class); > cityListDescritpor.enableTimeToLive(cityTtlConfig); > > > Secondly, I have applied TTL to my state, and I rely on RocksDB's automated > compaction process to clear expired events. However, according to the RocksDB > metrics provided by Flink, the compaction process is not occurring[attached > the metrics screenshot], and there is a constant increase in the savepoint > size. Do you suggest adding cleanupInRocksdbCompactFilter(1000) as well? What > will be the impact of this configuration? > > On Tue, Jul 4, 2023 at 8:13 AM Yanfei Lei <fredia...@gmail.com> wrote: >> >> Hi neha, >> >> Due to the limitation of RocksDB, we cannot create a >> strict-capacity-limit LRUCache which shared among rocksDB instance(s), >> FLINK-15532[1] is created to track this. >> BTW, have you set TTL for this job[2], TTL can help control the state size. >> >> [1] https://issues.apache.org/jira/browse/FLINK-15532 >> [2]https://issues.apache.org/jira/browse/FLINK-31089 >> >> Shammon FY <zjur...@gmail.com> 于2023年7月4日周二 09:08写道: >> > >> > Hi neha, >> > >> > Which flink version are you using? We have also encountered the issue of >> > continuous growth of off-heap memory in the TM of the session cluster >> > before, the reason is that the memory fragments cannot be reused like >> > issue [1]. You can check the memory allocator and try to use jemalloc >> > instead refer to doc [2] and [3]. >> > >> > [1] https://issues.apache.org/jira/browse/FLINK-19125 >> > [2] >> > https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.12/#deployment >> > [3] >> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#switching-the-memory-allocator >> > >> > Best, >> > Shammon FY >> > >> > On Sat, Jul 1, 2023 at 2:58 PM neha goyal <nehagoy...@gmail.com> wrote: >> >> >> >> Hello, >> >> >> >> I am trying to debug the unbounded memory consumption by the Flink >> >> process. The heap size of the process remains the same. The size of the >> >> RSS of the process keeps on increasing. I suspect it might be because of >> >> RocksDB. >> >> >> >> we have the default value for state.backend.rocksdb.memory.managed as >> >> true. Can anyone confirm that this config will Rockdb be able to take the >> >> unbounded native memory? >> >> >> >> If yes, what metrics can I check to confirm the issue? Any help would be >> >> appreciated. >> >> >> >> -- >> Best, >> Yanfei -- Best, Yanfei