Hi neha,
1. You can set the path of jemalloc into LD_LIBRARY_PATH of YARN[1],
and here is a blog post about "RocksDB Memory Usage"[2].
2. The default value of cleanupInRocksdbCompactFilter is 1000[3],
maybe another value can be set according to the TPS of the job. The
value of `state.backend.rocksdb.metrics.num-running-compactions`[4] may
be affected by the sampling frequency of metrics, is the value of
`state.backend.rocksdb.metrics.compaction-read-bytes` also zero?
[1]
https://data-flair.training/forums/topic/how-to-include-native-libraries-in-yarn-jobs/
[2] https://shopify.engineering/optimizing-apache-flink-applications-tips
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#cleanup-during-rocksdb-compaction
[4]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rocksdb-native-metrics
neha goyal 于2023年7月4日周二 17:39写道:
>
> Hello Yanfei and Shammon,
>
> I have two additional questions. The links mentioned in the reply talk about
> using jemalloc in a Docker image, but I am using Yarn on AWS EMR. How can I
> use jemalloc with Yarn? Any references you can provide would be greatly
> appreciated.
>
> StateTtlConfig cityTtlConfig = StateTtlConfig
>
> .newBuilder(org.apache.flink.api.common.time.Time.hours(longerLookbackHours))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .build();
> MapStateDescriptor cityListDescritpor = new
> MapStateDescriptor("cityList", String.class, Integer.class);
> cityListDescritpor.enableTimeToLive(cityTtlConfig);
>
>
> Secondly, I have applied TTL to my state, and I rely on RocksDB's automated
> compaction process to clear expired events. However, according to the RocksDB
> metrics provided by Flink, the compaction process is not occurring[attached
> the metrics screenshot], and there is a constant increase in the savepoint
> size. Do you suggest adding cleanupInRocksdbCompactFilter(1000) as well? What
> will be the impact of this configuration?
>
> On Tue, Jul 4, 2023 at 8:13 AM Yanfei Lei wrote:
>>
>> Hi neha,
>>
>> Due to the limitation of RocksDB, we cannot create a
>> strict-capacity-limit LRUCache which shared among rocksDB instance(s),
>> FLINK-15532[1] is created to track this.
>> BTW, have you set TTL for this job[2], TTL can help control the state size.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-15532
>> [2]https://issues.apache.org/jira/browse/FLINK-31089
>>
>> Shammon FY 于2023年7月4日周二 09:08写道:
>> >
>> > Hi neha,
>> >
>> > Which flink version are you using? We have also encountered the issue of
>> > continuous growth of off-heap memory in the TM of the session cluster
>> > before, the reason is that the memory fragments cannot be reused like
>> > issue [1]. You can check the memory allocator and try to use jemalloc
>> > instead refer to doc [2] and [3].
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-19125
>> > [2]
>> > https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.12/#deployment
>> > [3]
>> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#switching-the-memory-allocator
>> >
>> > Best,
>> > Shammon FY
>> >
>> > On Sat, Jul 1, 2023 at 2:58 PM neha goyal wrote:
>> >>
>> >> Hello,
>> >>
>> >> I am trying to debug the unbounded memory consumption by the Flink
>> >> process. The heap size of the process remains the same. The size of the
>> >> RSS of the process keeps on increasing. I suspect it might be because of
>> >> RocksDB.
>> >>
>> >> we have the default value for state.backend.rocksdb.memory.managed as
>> >> true. Can anyone confirm that this config will Rockdb be able to take the
>> >> unbounded native memory?
>> >>
>> >> If yes, what metrics can I check to confirm the issue? Any help would be
>> >> appreciated.
>>
>>
>>
>> --
>> Best,
>> Yanfei
--
Best,
Yanfei