Hi Yun and Congxian!
I have implemented a pre-filter that uses an keyed state (
AggregatingState[Long]) that computes the size of all records seen for each
key, which lets me filter-out records that should be too big for the
RocksDB JNI bridge. This seems to make our job behave better! Thanks for
your help guys, this was really helpful :)

Robin

Le sam. 16 mai 2020 à 09:05, Congxian Qiu <[email protected]> a écrit :

> Hi
>
> As you described, I'm not sure whether MapState can help you in such case.
> MapState will serializer each <mapKey, mapvalue> separately, so it would
> not encounter such the problem as ListState.
>
> When using MapState, you may need to handle how to set the mapKey, if the
> whole state will be cleared after processed, then you can use a monotonous
> increment integer as the mapKey, store the upper used mapKey in a value
> state.
>
>
> Best,
> Congxian
>
>
> Yun Tang <[email protected]> 于2020年5月15日周五 下午10:31写道:
>
>> Hi Robin
>>
>> I think you could record the size of you list under currentKey with
>> another value state or operator state (store a Map with <key-by key, list
>> length>, store the whole map in list when snapshotting). If you do not have
>> many key-by keys, operator state is a good choice as that is on-heap and
>> lightweight.
>>
>> Best
>> Yun Tang
>> ------------------------------
>> *From:* Robin Cassan <[email protected]>
>> *Sent:* Friday, May 15, 2020 20:59
>> *To:* Yun Tang <[email protected]>
>> *Cc:* user <[email protected]>
>> *Subject:* Re: Protection against huge values in RocksDB List State
>>
>> Hi Yun, thanks for your answer! And sorry I didn't see this limitation
>> from the documentation, makes sense!
>> In our case, we are merging too many elements (since each element is
>> limited to 4Mib in our kafka topic). I agree we do not want our state to
>> contain really big values, this is why we are trying to find a way to put a
>> limit on the number (or total size) of elements that are aggregated in the
>> state of the window.
>> We have found a way to do this by using another sessionWindow that is set
>> before the other one, which will store the number of messages for each key
>> and reject new messages if we have reached a limit, but we are wondering if
>> there is a better way to achieve that without creating another state.
>>
>> Thanks again,
>> Robin
>>
>> Le jeu. 14 mai 2020 à 19:38, Yun Tang <[email protected]> a écrit :
>>
>> Hi Robin
>>
>> First of all, the root cause is not RocksDB cannot store large list state
>> when you merge but the JNI limitation of 2^31 bytes [1].
>> Moreover, RocksDB java would not return anything when you call merge [2]
>> operator.
>>
>> Did you merge too many elements or just merge too big-size elements? Last
>> but not least, even you could merge large list, I think getting a value
>> with size larger than 2^31 bytes should not behave well.
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
>> [2]
>> https://github.com/facebook/rocksdb/blob/50d63a2af01a46dd938dc1b717067339c92da040/java/src/main/java/org/rocksdb/RocksDB.java#L1382
>>
>> Best
>> Yun Tang
>> ------------------------------
>> *From:* Robin Cassan <[email protected]>
>> *Sent:* Friday, May 15, 2020 0:37
>> *To:* user <[email protected]>
>> *Subject:* Protection against huge values in RocksDB List State
>>
>> Hi all!
>>
>> I cannot seem to find any setting to limit the number of records appended
>> in a RocksDBListState that is used when we use SessionWindows with a
>> ProcessFunction.
>> It seems that, for each incoming element, the new element will be
>> appended to the value with the RocksDB `merge` operator, without any
>> safeguard to make sure that it doesn't grow infinitely. RocksDB merge seems
>> to support returning false in case of error, so I guess we could implement
>> a limit by returning false in the merge operator, but since Flink seems to
>> use the "stringappendtest" merge operator (
>> https://github.com/facebook/rocksdb/blob/fdf882ded218344c136c97daf76dfb59e4bc155f/utilities/merge_operators/string_append/stringappend2.cc
>>  ),
>> we always return true no matter what.
>>
>> This is troublesome for us because it would make a lot of sense to
>> specify an acceptable limit to how many elements can be aggregated under a
>> given key, and because when we happen to have too many elements we get an
>> exception from RocksDB:
>> ```
>> Caused by: org.apache.flink.util.FlinkRuntimeException: Error while
>> retrieving data from RocksDB
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60)
>> at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501)
>> at
>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>> ... 7 more
>> Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM
>> limit
>> at org.rocksdb.RocksDB.get(Native Method)
>> at org.rocksdb.RocksDB.get(RocksDB.java:810)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118)
>> ... 12 more
>> ```
>>
>> We are currently bypassing this by using a Reduce operator instead, which
>> ensures that we only store one element per key, but this gives us degraded
>> performance.
>>
>> Thanks for your input!
>> Robin
>>
>>

Reply via email to