Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-06 Thread Kevin Lam
Hi Fabian,

Yes I can tell you a bit more about the job we are seeing the problem with.
I'll simplify things a bit but this captures the essence:

1. Input datastreams are from a few kafka sources that we intend to join.
2. We wrap the datastreams we want to join into a common container class
and key them on the join key.
3. Union and process the datastreams with a KeyedProcessFunction which
holds the latest value seen for each source in ValueStates, and emits an
output that is the function of the stored ValueStates each time a new value
comes in.
4. We have to support arbitrarily late arriving data, so we don't window,
and just keep everything in ValueState.
5. The state we want to support is very large, on the order of several TBs.

On Wed, Oct 6, 2021 at 10:50 AM Fabian Paul 
wrote:

> Hi Kevin,
>
> Since you are seeing the problem across multiple Flink versions and with
> the default RocksDb and custom configuration it might be related
>  to something else. A lot of different components can allocate direct
> memory i.e. some filesystem implementations, the connectors or some user
> grpc dependency.
>
>
> Can you tell use a bit more about the job you are seeing the problem with?
>
> Best,
> Fabian
>
>


Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-06 Thread Fabian Paul
Hi Kevin,

Since you are seeing the problem across multiple Flink versions and with the 
default RocksDb and custom configuration it might be related
 to something else. A lot of different components can allocate direct memory 
i.e. some filesystem implementations, the connectors or some user grpc 
dependency.


Can you tell use a bit more about the job you are seeing the problem with?

Best,
Fabian



Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-06 Thread Kevin Lam
Hi Fabian,

Thanks for collecting feedback. Here's the answers to your questions:

1. Yes, we enabled incremental checkpoints for our job by setting
`state.backend.incremental` to true. As for whether the checkpoint we
recover from is incremental or not, I'm not sure how to determine that.
It's whatever Flink does by default with incremental checkpoints enabled.

2. Yes this was on purpose, we had tuned our job to work well on SSDs. We
have also run jobs with those parameters unset and using defaults, and
still have the same OOM issues.

Thanks for the pointer, yes we've been looking at the RocksDB metrics. They
haven't indicated to us what the issue is yet.

On Wed, Oct 6, 2021 at 3:21 AM Fabian Paul  wrote:

> Hi Kevin,
>
> Sorry for the late reply. I collected some feedback from other folks and
> have two more questions.
>
> 1. Did you enable incremental checkpoints for your job and is the
> checkpoint you recover from incremental?
>
> 2. I saw in your configuration that you set
> `state.backend.rocksdb.block.cache-size` and
> `state.backend.rocksdb.predefined.options` by doing
>  so you overwrite the values Flink automatically sets. Can you confirm
> that this is on purpose? The value for block.cache-size seems to be very
> small.
>
> You can also enable the native RocksDb metrics [1] to get a more detail
> view of the RocksDb memory consumption but be carefully because it may
> degrade the performance of your job.
>
> Best,
> Fabian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rocksdb-native-metrics
>
>
>


Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-06 Thread Fabian Paul
Hi Kevin,

Sorry for the late reply. I collected some feedback from other folks and have 
two more questions.

1. Did you enable incremental checkpoints for your job and is the checkpoint 
you recover from incremental? 

2. I saw in your configuration that you set 
`state.backend.rocksdb.block.cache-size` and 
`state.backend.rocksdb.predefined.options` by doing 
 so you overwrite the values Flink automatically sets. Can you confirm that 
this is on purpose? The value for block.cache-size seems to be very small.

You can also enable the native RocksDb metrics [1] to get a more detail view of 
the RocksDb memory consumption but be carefully because it may degrade the 
performance of your job.

Best,
Fabian

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rocksdb-native-metrics




Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-05 Thread Kevin Lam
i was reading a bit about RocksDb and it seems the Java version is somewhat
particular about how it should be cleaned up to ensure all resources are
cleaned up:


ttps://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management


   - "Many of the Java Objects used in the RocksJava API will be backed by
   C++ objects for which the Java Objects have ownership. As C++ has no notion
   of automatic garbage collection for its heap in the way that Java does, we
   must explicitly free the memory used by the C++ objects when we are
   finished with them."

Column families also have a specific close procedure


https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families

   - "It is important to note that when working with Column Families in
   RocksJava, there is a very specific order of destruction that must be
   obeyed for the database to correctly free all resources and shutdown."

When a running job fails and a running TaskManager restores from
checkpoint, is the old Embedded RocksDb being cleaned up properly? I wasn't
really sure where to look in the Flink source code to verify this.

On Mon, Oct 4, 2021 at 4:56 PM Kevin Lam  wrote:

> We tried with 1.14.0, unfortunately we still run into the issue. Any
> thoughts or suggestions?
>
> On Mon, Oct 4, 2021 at 9:09 AM Kevin Lam  wrote:
>
>> Hi Fabian,
>>
>> We're using our own image built from the official Flink docker image, so
>> we should have the code to use jemalloc in the docker entrypoint.
>>
>> I'm going to give 1.14 a try and will let you know how it goes.
>>
>> On Mon, Oct 4, 2021 at 8:29 AM Fabian Paul 
>> wrote:
>>
>>> Hi Kevin,
>>>
>>> We bumped the RocksDb version with Flink 1.14 which we thought increases
>>> the memory control [1]. In the past we also saw problems with the allocator
>>> used of the OS. We switched to use jemalloc within our docker images which
>>> has a better memory fragmentation [2]. Are you using the official Flink
>>> docker image or did you build your own?
>>>
>>> I am also pulling in yun tang who is more familiar with Flink’s state
>>> backend. Maybe he has an immediate idea about your problem.
>>>
>>> Best,
>>> Fabian
>>>
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-14482
>>> [2]
>>> https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99%40%3Cdev.flink.apache.org%3E
>>> 
>>>
>>>
>>>


Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-04 Thread Kevin Lam
We tried with 1.14.0, unfortunately we still run into the issue. Any
thoughts or suggestions?

On Mon, Oct 4, 2021 at 9:09 AM Kevin Lam  wrote:

> Hi Fabian,
>
> We're using our own image built from the official Flink docker image, so
> we should have the code to use jemalloc in the docker entrypoint.
>
> I'm going to give 1.14 a try and will let you know how it goes.
>
> On Mon, Oct 4, 2021 at 8:29 AM Fabian Paul 
> wrote:
>
>> Hi Kevin,
>>
>> We bumped the RocksDb version with Flink 1.14 which we thought increases
>> the memory control [1]. In the past we also saw problems with the allocator
>> used of the OS. We switched to use jemalloc within our docker images which
>> has a better memory fragmentation [2]. Are you using the official Flink
>> docker image or did you build your own?
>>
>> I am also pulling in yun tang who is more familiar with Flink’s state
>> backend. Maybe he has an immediate idea about your problem.
>>
>> Best,
>> Fabian
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-14482
>> [2]
>> https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99%40%3Cdev.flink.apache.org%3E
>> 
>>
>>
>>


Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-04 Thread Kevin Lam
Hi Fabian,

We're using our own image built from the official Flink docker image, so we
should have the code to use jemalloc in the docker entrypoint.

I'm going to give 1.14 a try and will let you know how it goes.

On Mon, Oct 4, 2021 at 8:29 AM Fabian Paul  wrote:

> Hi Kevin,
>
> We bumped the RocksDb version with Flink 1.14 which we thought increases
> the memory control [1]. In the past we also saw problems with the allocator
> used of the OS. We switched to use jemalloc within our docker images which
> has a better memory fragmentation [2]. Are you using the official Flink
> docker image or did you build your own?
>
> I am also pulling in yun tang who is more familiar with Flink’s state
> backend. Maybe he has an immediate idea about your problem.
>
> Best,
> Fabian
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-14482
> [2]
> https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99%40%3Cdev.flink.apache.org%3E
> 
>
>
>


Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-04 Thread Fabian Paul
Hi Kevin,

We bumped the RocksDb version with Flink 1.14 which we thought increases the 
memory control [1]. In the past we also saw problems with the allocator used of 
the OS. We switched to use jemalloc within our docker images which has a better 
memory fragmentation [2]. Are you using the official Flink docker image or did 
you build your own? 

I am also pulling in yun tang who is more familiar with Flink’s state backend. 
Maybe he has an immediate idea about your problem.

Best,
Fabian


[1] https://issues.apache.org/jira/browse/FLINK-14482 

[2] 
https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99%40%3Cdev.flink.apache.org%3E
 





Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-01 Thread Kevin Lam
Hi Fabian,

Thanks for your response.

Sure, let me tell you a bit more about the job.

   - Flink version 1.13.1 (I also tried 1.13.2 because I saw FLINK-22886
   , but this didn't
   help)
   - We're running on kubernetes in an application cluster.
   taskmanager.memory.process.size = 16GB, but we give our task manager pods a
   memory limit of 20GB. Our full config is below [0]

We've followed the steps at
https://erikwramner.files.wordpress.com/2017/10/native-memory-leaks-in-java.pdf
,
https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/tooldescr007.html,
and
https://technology.blog.gov.uk/2015/12/11/using-jemalloc-to-get-to-the-bottom-of-a-memory-leak/
to try and diagnose but this didn't really give us something to go off of.

Notably, we baselined the jcmd memory profile (jcmd $(pgrep java)
VM.native_memory baseline) and then ran a diff before and after the
post-restart memory spike, and nothing in there reflects the few GB of
usage increase.

What was added to Flink 1.14? What other issues have you seen in the past?

Also I came across
https://medium.com/expedia-group-tech/solving-a-native-memory-leak-71fe4b6f9463
when researching rocksdb. It suggests that unclosed RocksDB iterators can
be a source of memory leaks. Is there any chance there are iterators being
left open post job restart?

[0]
```
jobmanager.memory.process.size: 16Gb

taskmanager.rpc.port: 6122
taskmanager.memory.process.size: 16Gb
taskmanager.memory.managed.fraction: 0.4
taskmanager.numberOfTaskSlots: 4

high-availability.storageDir: 
kubernetes.cluster-id: 
kubernetes.namespace: 
high-availability.jobmanager.port: 50010
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
restart-strategy: exponential-delay
resourcemanager.taskmanager-registration.timeout: 30 min

blob.server.port: 6124
queryable-state.proxy.ports: 6125

heartbeat.interval: 6
heartbeat.timeout: 12

web.timeout: 180
rest.flamegraph.enabled: true

state.backend: rocksdb
state.checkpoints.dir: 
state.savepoints.dir: 

state.backend.rocksdb.localdir: /rocksdb
state.backend.incremental: true
state.backend.fs.memory-threshold: 1m
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.checkpoint.transfer.thread.num: 4
state.backend.rocksdb.block.blocksize: 16KB
state.backend.rocksdb.block.cache-size: 64MB
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED

jobmanager.execution.failover-strategy: region

metrics.scope.jm: flink.jobmanager
metrics.scope.jm.job: flink.jobmanager.job
metrics.scope.tm: flink.taskmanager
metrics.scope.tm.job: flink.taskmanager.job
metrics.scope.task: flink.task
metrics.scope.operator: flink.operator

state.backend.rocksdb.metrics.actual-delayed-write-rate: true
state.backend.rocksdb.metrics.background-errors: true
state.backend.rocksdb.metrics.block-cache-capacity: true
state.backend.rocksdb.metrics.block-cache-pinned-usage: true
state.backend.rocksdb.metrics.block-cache-usage: true
state.backend.rocksdb.metrics.compaction-pending: true
state.backend.rocksdb.metrics.cur-size-active-mem-table: true
state.backend.rocksdb.metrics.cur-size-all-mem-tables: true
state.backend.rocksdb.metrics.estimate-live-data-size: true
state.backend.rocksdb.metrics.estimate-num-keys: true
state.backend.rocksdb.metrics.estimate-pending-compaction-bytes: true
state.backend.rocksdb.metrics.estimate-table-readers-mem: true
state.backend.rocksdb.metrics.is-write-stopped: true
state.backend.rocksdb.metrics.mem-table-flush-pending: true
state.backend.rocksdb.metrics.num-deletes-active-mem-table: true
state.backend.rocksdb.metrics.num-deletes-imm-mem-tables: true
state.backend.rocksdb.metrics.num-entries-active-mem-table: true
state.backend.rocksdb.metrics.num-entries-imm-mem-tables: true
state.backend.rocksdb.metrics.num-immutable-mem-table: true
state.backend.rocksdb.metrics.num-live-versions: true
state.backend.rocksdb.metrics.num-running-compactions: true
state.backend.rocksdb.metrics.num-running-flushes: true
state.backend.rocksdb.metrics.num-snapshots: true
state.backend.rocksdb.metrics.size-all-mem-tables: true

env.java.opts: -Djavax.net.ssl.keyStore=/app/kafka/certs/certificate.jks
-Djavax.net.ssl.keyStorePassword=changeit -Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.local.only=false
-Dcom.sun.management.jmxremote.port=1099
-Dcom.sun.management.jmxremote.rmi.port=1099
-Djava.rmi.server.hostname=127.0.0.1 -XX:NativeMemoryTracking=detail
env.java.opts.taskmanager: -Dtaskmanager.host=10.12.72.181
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/rocksdb/memdump.hprof
-Djava.rmi.server.hostname=127.0.0.1 -XX:NativeMemoryTracking=detail
jobmanager.rpc.address: flink-jobmanager
query.server.port: 6125
```

On Fri, Oct 1, 2021 at 9:38 AM Fabian Paul  wrote:

> Hi Kevin,
>
> You are right RocksDB is probably responsible for the memory 

Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-01 Thread Fabian Paul
Hi Kevin,

You are right RocksDB is probably responsible for the memory consumption you 
are noticing. We have definitely seen similar issues in the past and 
with the latest Flink version 1.14 we tried to restrict the RocksDB memory 
consumption even more to make it better controllable.

Can you tell is a bit more about the job you are using and the respective Flink 
version? I would be also interested what kind of memory 
configurations you did on the flink cluster i.e. 
taskmanager.memory.process.size. You can also have a look at the following docs 
pages [1] to 
fine tune the memory consumption of your job.

Please let me know if that helps.

Best,
Fabian


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_setup/#configure-total-memory




RocksDB: Spike in Memory Usage Post Restart

2021-09-30 Thread Kevin Lam
Hi all,

We're debugging an issue with OOMs that occurs on our jobs shortly after a
restore from checkpoint. Our application is running on kubernetes and uses
RocksDB as it's state backend.

We reproduced the issue on a small cluster of 2 task managers. If we killed
a single task manager, we noticed that after restoring from checkpoint, the
untouched task manager has an elevated memory footprint (see the blue line
for the surviving task manager):

[image: image.png]
If we kill the newest TM (yellow line) again, after restoring the surviving
task manager gets OOM killed.

We looked at the OOMKiller Report and it seems that the memory is not
coming from the JVM but we're unsure of the source. It seems like something
is allocating native memory that the JVM is not aware of.

We're suspicious of RocksDB. Has anyone seen this kind of issue before? Is
it possible there's some kind of memory pressure or memory leak coming from
RocksDB that only presents itself when a job is restarted? Perhaps
something isn't cleaned up?

Any help would be appreciated.