Thank you all for the great insights and suggestions!

I understand that the underlying components used by the state processor api
are sufficiently different that it may explain this slowness and this
behavior is not something caused by the way we use this API.

David.

On Fri, Sep 10, 2021 at 5:27 AM Yun Tang <myas...@live.com> wrote:

> Hi David,
>
> I think Seth had shared some useful information.
>
> If you want to know what happened within RocksDB when you're reading, you
> can leverage async-profiler [1] to catch the RocksDB stacks and I guess
> that index block might be evicted too frequently during your read. And we
> could use new read option which disable fillCache [2] to speedup bulk scan
> in the future to help improve the performance.
>
>
> Best
> Yun Tang
>
> [1] https://github.com/jvm-profiling-tools/async-profiler
> [2]
> https://javadoc.io/doc/org.rocksdb/rocksdbjni/6.20.3/org/rocksdb/ReadOptions.html#setFillCache(boolean)
> ------------------------------
> *From:* Seth Wiesman <sjwies...@gmail.com>
> *Sent:* Friday, September 10, 2021 0:58
> *To:* David Causse <dcau...@wikimedia.org>; user <user@flink.apache.org>
> *Cc:* Piotr Nowojski <pnowoj...@apache.org>
> *Subject:* Re: State processor API very slow reading a keyed state with
> RocksDB
>
> Hi David,
>
> I was also able to reproduce the behavior, but was able to get
> significant performance improvements by reducing the number of slots on
> each TM to 1.
>
> My suspicion, as Piotr alluded to, has to do with the different runtime
> execution of DataSet over DataStream. In particular, Flink's DataStream
> operators are aware of the resource requirements of the state backend and
> include RocksDB in its internal memory configurations. In the state
> processor api, the underlying input format is a blackbox.
>
> Another thing to know is that when running multiple RocksDB instances
> within the same JVM, you are actually running a single native process with
> multiple logical instances. I _think_ we are seeing contention amongst the
> logical RocksDB instances.
>
> Even with one slot, it is not as fast as I would like and will need to
> continue investigating. If my suspicion for the slowness is correct, we
> will need to migrate to the new Source API and improve this as part of
> DataStream integration. This migration is something we'd like to do
> regardless, but I don't have a timeline to share.
>
> *Aside: Why is writing still relatively fast? *
>
> Even with these factors accounted for, I do still expect writing to be
> faster than reading. This is due to both how RocksDB internal data
> structures work, along with some peculiarities of how to state processor
> API has to perform reads.
>
> 1. RocksDB internally uses a data structure called a log structured merge
> tree (or LSM). This means writes are always implemented as appends, so
> there is no seek required. Additionally, writes go into an in-memory data
> structure called a MemTable that is flushed to disk asynchronously.
> Because there may be multiple entries for a given key, RocksDB needs to
> search for the most recent value and potentially read from disk. This may
> be alleviated by enabling bloom filters but does have memory costs.
>
> 2. RocksDB is a key value store, so Flink represents each registered state
> (ValueState, ListState, etc) as its own column family (table). A key only
> exists in a table if it has a non-null value. This means not all keys exist
> in all column families for a given operator. The state-proc-api wants to
> make it appear as if each operator is composed of a single table with
> multiple columns. To do this, we perform a full table scan on one column
> family and then do point lookups of that key on the others. However, we
> still need to find the keys that may only exist in other tables. The trick
> we perform is to delete keys from rocksDB after each read, so we can do
> full table scans on all column families but never see any duplicates. This
> means the reader is performing multiple reads and writes on every call to
> `readKey` and is more expensive than it may appear.
>
> Seth
>
>
> On Thu, Sep 9, 2021 at 1:48 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
> Hi David,
>
> I can confirm that I'm able to reproduce this behaviour. I've tried
> profiling/flame graphs and I was not able to make much sense out of those
> results. There are no IO/Memory bottlenecks that I could notice, it looks
> indeed like the Job is stuck inside RocksDB itself. This might be an issue
> with for example memory configuration. Streaming jobs and State Processor
> API are running in very different environments as the latter one is using
> DataSet API under the hood, so maybe that can explain this? However I'm no
> expert in neither DataSet API nor the RocksDB, so it's hard for me to make
> progress here.
>
> Maybe someone else can help here?
>
> Piotrek
>
>
> śr., 8 wrz 2021 o 14:45 David Causse <dcau...@wikimedia.org> napisał(a):
>
> Hi,
>
> I'm investigating why a job we use to inspect a flink state is a lot
> slower than the bootstrap job used to generate it.
>
> I use RocksdbDB with a simple keyed value state mapping a string key to a
> long value. Generating the bootstrap state from a CSV file with 100M
> entries takes a couple minutes over 12 slots spread over 3 TM (4Gb
> allowed). But another job that does the opposite (converts this state into
> a CSV file) takes several hours. I would have expected these two job
> runtimes to be in the same ballpark.
>
> I wrote a simple test case[1] to reproduce the problem. This program has 3
> jobs:
> - CreateState: generate a keyed state (string->long) using the state
> processor api
> - StreamJob: reads all the keys using a StreamingExecutionEnvironment
> - ReadState: reads all the keys using the state processor api
>
> Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState &
> StreamJob are done in less than a minute.
> ReadState is much slower (> 30minutes) on my system. The RocksDB state
> appears to be restored relatively quickly but after that the slots are
> performing at very different speeds. Some slots finish quickly but some
> others struggle to advance.
> Looking at the thread dumps I always see threads in
> org.rocksdb.RocksDB.get:
>
> "DataSource (at readKeyedState(ExistingSavepoint.java:314)
> (org.apache.flink.state.api.input.KeyedStateInputFormat)) (12/12)#0" Id=371
> RUNNABLE
> at org.rocksdb.RocksDB.get(Native Method)
> at org.rocksdb.RocksDB.get(RocksDB.java:2084)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:38)
> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:32)
> at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:76)
> at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:51)
> at
> org.apache.flink.state.api.input.KeyedStateInputFormat.nextRecord(KeyedStateInputFormat.java:228)
>
> It seems suspiciously slow to me and I'm wondering if I'm missing
> something in the way the state processor api works.
>
> Thanks for your help!
>
> David.
>
> 1: https://github.com/nomoa/rocksdb-state-processor-test
>
>

Reply via email to