Hi David,

I think Seth had shared some useful information.

If you want to know what happened within RocksDB when you're reading, you can 
leverage async-profiler [1] to catch the RocksDB stacks and I guess that index 
block might be evicted too frequently during your read. And we could use new 
read option which disable fillCache [2] to speedup bulk scan in the future to 
help improve the performance.


Best
Yun Tang

[1] https://github.com/jvm-profiling-tools/async-profiler
[2] 
https://javadoc.io/doc/org.rocksdb/rocksdbjni/6.20.3/org/rocksdb/ReadOptions.html#setFillCache(boolean)
________________________________
From: Seth Wiesman <sjwies...@gmail.com>
Sent: Friday, September 10, 2021 0:58
To: David Causse <dcau...@wikimedia.org>; user <user@flink.apache.org>
Cc: Piotr Nowojski <pnowoj...@apache.org>
Subject: Re: State processor API very slow reading a keyed state with RocksDB

Hi David,

I was also able to reproduce the behavior, but was able to get significant 
performance improvements by reducing the number of slots on each TM to 1.

My suspicion, as Piotr alluded to, has to do with the different runtime 
execution of DataSet over DataStream. In particular, Flink's DataStream 
operators are aware of the resource requirements of the state backend and 
include RocksDB in its internal memory configurations. In the state processor 
api, the underlying input format is a blackbox.

Another thing to know is that when running multiple RocksDB instances within 
the same JVM, you are actually running a single native process with multiple 
logical instances. I _think_ we are seeing contention amongst the logical 
RocksDB instances.

Even with one slot, it is not as fast as I would like and will need to continue 
investigating. If my suspicion for the slowness is correct, we will need to 
migrate to the new Source API and improve this as part of DataStream 
integration. This migration is something we'd like to do regardless, but I 
don't have a timeline to share.

Aside: Why is writing still relatively fast?

Even with these factors accounted for, I do still expect writing to be faster 
than reading. This is due to both how RocksDB internal data structures work, 
along with some peculiarities of how to state processor API has to perform 
reads.

1. RocksDB internally uses a data structure called a log structured merge tree 
(or LSM). This means writes are always implemented as appends, so there is no 
seek required. Additionally, writes go into an in-memory data structure called 
a MemTable that is flushed to disk asynchronously.  Because there may be 
multiple entries for a given key, RocksDB needs to search for the most recent 
value and potentially read from disk. This may be alleviated by enabling bloom 
filters but does have memory costs.

2. RocksDB is a key value store, so Flink represents each registered state 
(ValueState, ListState, etc) as its own column family (table). A key only 
exists in a table if it has a non-null value. This means not all keys exist in 
all column families for a given operator. The state-proc-api wants to make it 
appear as if each operator is composed of a single table with multiple columns. 
To do this, we perform a full table scan on one column family and then do point 
lookups of that key on the others. However, we still need to find the keys that 
may only exist in other tables. The trick we perform is to delete keys from 
rocksDB after each read, so we can do full table scans on all column families 
but never see any duplicates. This means the reader is performing multiple 
reads and writes on every call to `readKey` and is more expensive than it may 
appear.

Seth


On Thu, Sep 9, 2021 at 1:48 AM Piotr Nowojski 
<pnowoj...@apache.org<mailto:pnowoj...@apache.org>> wrote:
Hi David,

I can confirm that I'm able to reproduce this behaviour. I've tried 
profiling/flame graphs and I was not able to make much sense out of those 
results. There are no IO/Memory bottlenecks that I could notice, it looks 
indeed like the Job is stuck inside RocksDB itself. This might be an issue with 
for example memory configuration. Streaming jobs and State Processor API are 
running in very different environments as the latter one is using DataSet API 
under the hood, so maybe that can explain this? However I'm no expert in 
neither DataSet API nor the RocksDB, so it's hard for me to make progress here.

Maybe someone else can help here?

Piotrek


śr., 8 wrz 2021 o 14:45 David Causse 
<dcau...@wikimedia.org<mailto:dcau...@wikimedia.org>> napisał(a):
Hi,

I'm investigating why a job we use to inspect a flink state is a lot slower 
than the bootstrap job used to generate it.

I use RocksdbDB with a simple keyed value state mapping a string key to a long 
value. Generating the bootstrap state from a CSV file with 100M entries takes a 
couple minutes over 12 slots spread over 3 TM (4Gb allowed). But another job 
that does the opposite (converts this state into a CSV file) takes several 
hours. I would have expected these two job runtimes to be in the same ballpark.

I wrote a simple test case[1] to reproduce the problem. This program has 3 jobs:
- CreateState: generate a keyed state (string->long) using the state processor 
api
- StreamJob: reads all the keys using a StreamingExecutionEnvironment
- ReadState: reads all the keys using the state processor api

Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState & StreamJob 
are done in less than a minute.
ReadState is much slower (> 30minutes) on my system. The RocksDB state appears 
to be restored relatively quickly but after that the slots are performing at 
very different speeds. Some slots finish quickly but some others struggle to 
advance.
Looking at the thread dumps I always see threads in org.rocksdb.RocksDB.get:

"DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) (12/12)#0" Id=371 
RUNNABLE
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:2084)
at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
at org.wikimedia.flink.StateReader.readKey(ReadState.scala:38)
at org.wikimedia.flink.StateReader.readKey(ReadState.scala:32)
at 
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:76)
at 
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:51)
at 
org.apache.flink.state.api.input.KeyedStateInputFormat.nextRecord(KeyedStateInputFormat.java:228)

It seems suspiciously slow to me and I'm wondering if I'm missing something in 
the way the state processor api works.

Thanks for your help!

David.

1: https://github.com/nomoa/rocksdb-state-processor-test

Reply via email to