Hi Robin

First of all, the root cause is not RocksDB cannot store large list state when 
you merge but the JNI limitation of 2^31 bytes [1].
Moreover, RocksDB java would not return anything when you call merge [2] 
operator.

Did you merge too many elements or just merge too big-size elements? Last but 
not least, even you could merge large list, I think getting a value with size 
larger than 2^31 bytes should not behave well.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
[2] 
https://github.com/facebook/rocksdb/blob/50d63a2af01a46dd938dc1b717067339c92da040/java/src/main/java/org/rocksdb/RocksDB.java#L1382

Best
Yun Tang
________________________________
From: Robin Cassan <[email protected]>
Sent: Friday, May 15, 2020 0:37
To: user <[email protected]>
Subject: Protection against huge values in RocksDB List State

Hi all!

I cannot seem to find any setting to limit the number of records appended in a 
RocksDBListState that is used when we use SessionWindows with a ProcessFunction.
It seems that, for each incoming element, the new element will be appended to 
the value with the RocksDB `merge` operator, without any safeguard to make sure 
that it doesn't grow infinitely. RocksDB merge seems to support returning false 
in case of error, so I guess we could implement a limit by returning false in 
the merge operator, but since Flink seems to use the "stringappendtest" merge 
operator ( 
https://github.com/facebook/rocksdb/blob/fdf882ded218344c136c97daf76dfb59e4bc155f/utilities/merge_operators/string_append/stringappend2.cc
 ), we always return true no matter what.

This is troublesome for us because it would make a lot of sense to specify an 
acceptable limit to how many elements can be aggregated under a given key, and 
because when we happen to have too many elements we get an exception from 
RocksDB:
```
Caused by: org.apache.flink.util.FlinkRuntimeException: Error while retrieving 
data from RocksDB
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111)
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501)
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
... 7 more
Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM limit
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:810)
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118)
... 12 more
```

We are currently bypassing this by using a Reduce operator instead, which 
ensures that we only store one element per key, but this gives us degraded 
performance.

Thanks for your input!
Robin

Reply via email to