Hi Nick

As you might know, RocksDB suffers not so good performance for iterator-like 
operations due to it needs to merge sort for multi levels. [1]

Unfortunately, rocksDBMapState.isEmpty() needs to call iterator and seek 
operations over rocksDB [2], and rocksDBMapState.clear() needs to iterator over 
state and remove entry [3].
However, even these operations behaves not so good, I don't think they would 
behave extremely bad in general case. From our experience on SSD, the latency 
of seek should be less than 100us
and could go up to hundreds of us, did you use SSD disk?

  1.  What is the Flink version, taskmanager memory, number of slots and 
RocksDB related configurations?
  2.  Have you checked the IOPS, disk util for those machines which containing 
task manager running RocksDB?

[1] https://github.com/facebook/rocksdb/wiki/Iterator-Implementation
[2] 
https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L241
[3] 
https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L254

Best
Yun Tang

________________________________
From: nick toker <[email protected]>
Sent: Tuesday, June 16, 2020 15:35
To: [email protected] <[email protected]>
Subject: MapState bad performance

Hello,

We wrote a very simple streaming pipeline containing:
1. Kafka consumer
2. Process function
3. Kafka producer

The code of the process function is listed below:


private transient MapState<String, Object> testMapState;

@Override
    public void processElement(Map<String, Object> value, Context ctx, 
Collector<Map<String, Object>> out) throws Exception {

            if (testMapState.isEmpty()) {

                testMapState.putAll(value);

                out.collect(value);

                testMapState.clear();
            }
        }

We faced very bad performance and then we made some tests using jprofiler.
Using jprofiler, we saw that the hot spots are 2 functions of the MapState:
1. isEmpty() - around 7 ms
2. clear() - around 4 ms

We had to change and use ValueState instead.

Are we using the MapState in the correct way or are we doing something wrong ?
Is this behaviour expected because flink  recommendations are to use MapState 
and NOT ValueState ?

BR,
Nick

Reply via email to