Hi,

I'm investigating why a job we use to inspect a flink state is a lot slower
than the bootstrap job used to generate it.

I use RocksdbDB with a simple keyed value state mapping a string key to a
long value. Generating the bootstrap state from a CSV file with 100M
entries takes a couple minutes over 12 slots spread over 3 TM (4Gb
allowed). But another job that does the opposite (converts this state into
a CSV file) takes several hours. I would have expected these two job
runtimes to be in the same ballpark.

I wrote a simple test case[1] to reproduce the problem. This program has 3
jobs:
- CreateState: generate a keyed state (string->long) using the state
processor api
- StreamJob: reads all the keys using a StreamingExecutionEnvironment
- ReadState: reads all the keys using the state processor api

Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState &
StreamJob are done in less than a minute.
ReadState is much slower (> 30minutes) on my system. The RocksDB state
appears to be restored relatively quickly but after that the slots are
performing at very different speeds. Some slots finish quickly but some
others struggle to advance.
Looking at the thread dumps I always see threads in org.rocksdb.RocksDB.get:

"DataSource (at readKeyedState(ExistingSavepoint.java:314)
(org.apache.flink.state.api.input.KeyedStateInputFormat)) (12/12)#0" Id=371
RUNNABLE
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:2084)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
at org.wikimedia.flink.StateReader.readKey(ReadState.scala:38)
at org.wikimedia.flink.StateReader.readKey(ReadState.scala:32)
at
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:76)
at
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:51)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.nextRecord(KeyedStateInputFormat.java:228)

It seems suspiciously slow to me and I'm wondering if I'm missing something
in the way the state processor api works.

Thanks for your help!

David.

1: https://github.com/nomoa/rocksdb-state-processor-test

Reply via email to