Re: Query around Rocksdb

2023-07-05 Thread Yanfei Lei
Hi neha,

1. You can set the path of  jemalloc into LD_LIBRARY_PATH of YARN[1],
and here is a blog post about "RocksDB Memory Usage"[2].
2. The default value of cleanupInRocksdbCompactFilter is 1000[3],
maybe another value can be set according to the TPS of the job. The
value of `state.backend.rocksdb.metrics.num-running-compactions`[4] may
be affected by the sampling frequency of metrics,  is the value of
`state.backend.rocksdb.metrics.compaction-read-bytes` also zero?

[1] 
https://data-flair.training/forums/topic/how-to-include-native-libraries-in-yarn-jobs/
[2] https://shopify.engineering/optimizing-apache-flink-applications-tips
[3] 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#cleanup-during-rocksdb-compaction
[4] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rocksdb-native-metrics
neha goyal  于2023年7月4日周二 17:39写道:

>
> Hello Yanfei and Shammon,
>
> I have two additional questions. The links mentioned in the reply talk about 
> using jemalloc in a Docker image, but I am using Yarn on AWS EMR. How can I 
> use jemalloc with Yarn? Any references you can provide would be greatly 
> appreciated.
>
> StateTtlConfig cityTtlConfig = StateTtlConfig
> 
> .newBuilder(org.apache.flink.api.common.time.Time.hours(longerLookbackHours))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .build();
> MapStateDescriptor cityListDescritpor = new 
> MapStateDescriptor("cityList", String.class, Integer.class);
> cityListDescritpor.enableTimeToLive(cityTtlConfig);
>
>
> Secondly, I have applied TTL to my state, and I rely on RocksDB's automated 
> compaction process to clear expired events. However, according to the RocksDB 
> metrics provided by Flink, the compaction process is not occurring[attached 
> the metrics screenshot], and there is a constant increase in the savepoint 
> size. Do you suggest adding cleanupInRocksdbCompactFilter(1000) as well? What 
> will be the impact of this configuration?
>
> On Tue, Jul 4, 2023 at 8:13 AM Yanfei Lei  wrote:
>>
>> Hi neha,
>>
>> Due to the limitation of RocksDB, we cannot create a
>> strict-capacity-limit LRUCache which shared among rocksDB instance(s),
>> FLINK-15532[1] is created to track this.
>> BTW, have you set TTL for this job[2],  TTL can help control the state size.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-15532
>> [2]https://issues.apache.org/jira/browse/FLINK-31089
>>
>> Shammon FY  于2023年7月4日周二 09:08写道:
>> >
>> > Hi neha,
>> >
>> > Which flink version are you using? We have also encountered the issue of 
>> > continuous growth of off-heap memory in the TM of the session cluster 
>> > before, the reason is that the memory fragments cannot be reused like 
>> > issue [1]. You can check the memory allocator and try to use jemalloc 
>> > instead refer to doc [2] and [3].
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-19125
>> > [2] 
>> > https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.12/#deployment
>> > [3] 
>> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#switching-the-memory-allocator
>> >
>> > Best,
>> > Shammon FY
>> >
>> > On Sat, Jul 1, 2023 at 2:58 PM neha goyal  wrote:
>> >>
>> >> Hello,
>> >>
>> >> I am trying to debug the unbounded memory consumption by the Flink 
>> >> process. The heap size of the process remains the same. The size of the 
>> >> RSS of the process keeps on increasing. I suspect it might be because of 
>> >> RocksDB.
>> >>
>> >> we have the default value for state.backend.rocksdb.memory.managed as 
>> >> true. Can anyone confirm that this config will Rockdb be able to take the 
>> >> unbounded native memory?
>> >>
>> >> If yes, what metrics can I check to confirm the issue? Any help would be 
>> >> appreciated.
>>
>>
>>
>> --
>> Best,
>> Yanfei



--
Best,
Yanfei


Re: Query around Rocksdb

2023-07-03 Thread Yanfei Lei
Hi neha,

Due to the limitation of RocksDB, we cannot create a
strict-capacity-limit LRUCache which shared among rocksDB instance(s),
FLINK-15532[1] is created to track this.
BTW, have you set TTL for this job[2],  TTL can help control the state size.

[1] https://issues.apache.org/jira/browse/FLINK-15532
[2]https://issues.apache.org/jira/browse/FLINK-31089

Shammon FY  于2023年7月4日周二 09:08写道:
>
> Hi neha,
>
> Which flink version are you using? We have also encountered the issue of 
> continuous growth of off-heap memory in the TM of the session cluster before, 
> the reason is that the memory fragments cannot be reused like issue [1]. You 
> can check the memory allocator and try to use jemalloc instead refer to doc 
> [2] and [3].
>
> [1] https://issues.apache.org/jira/browse/FLINK-19125
> [2] 
> https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.12/#deployment
> [3] 
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#switching-the-memory-allocator
>
> Best,
> Shammon FY
>
> On Sat, Jul 1, 2023 at 2:58 PM neha goyal  wrote:
>>
>> Hello,
>>
>> I am trying to debug the unbounded memory consumption by the Flink process. 
>> The heap size of the process remains the same. The size of the RSS of the 
>> process keeps on increasing. I suspect it might be because of RocksDB.
>>
>> we have the default value for state.backend.rocksdb.memory.managed as true. 
>> Can anyone confirm that this config will Rockdb be able to take the 
>> unbounded native memory?
>>
>> If yes, what metrics can I check to confirm the issue? Any help would be 
>> appreciated.



-- 
Best,
Yanfei


Re: Query around Rocksdb

2023-07-03 Thread Shammon FY
Hi neha,

Which flink version are you using? We have also encountered the issue of
continuous growth of off-heap memory in the TM of the session cluster
before, the reason is that the memory fragments cannot be reused like issue
[1]. You can check the memory allocator and try to use jemalloc instead
refer to doc [2] and [3].

[1] https://issues.apache.org/jira/browse/FLINK-19125
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.12/#deployment
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#switching-the-memory-allocator

Best,
Shammon FY

On Sat, Jul 1, 2023 at 2:58 PM neha goyal  wrote:

> Hello,
>
> I am trying to debug the unbounded memory consumption by the Flink
> process. The heap size of the process remains the same. The size of the RSS
> of the process keeps on increasing. I suspect it might be because of
> RocksDB.
>
> we have the default value for state.backend.rocksdb.memory.managed as
> true. Can anyone confirm that this config will Rockdb be able to take the
> unbounded native memory?
>
> If yes, what metrics can I check to confirm the issue? Any help would be
> appreciated.
>


Query around Rocksdb

2023-07-01 Thread neha goyal
Hello,

I am trying to debug the unbounded memory consumption by the Flink process.
The heap size of the process remains the same. The size of the RSS of the
process keeps on increasing. I suspect it might be because of RocksDB.

we have the default value for state.backend.rocksdb.memory.managed as true.
Can anyone confirm that this config will Rockdb be able to take the
unbounded native memory?

If yes, what metrics can I check to confirm the issue? Any help would be
appreciated.