Hi Yun and Congxian! I have implemented a pre-filter that uses an keyed state ( AggregatingState[Long]) that computes the size of all records seen for each key, which lets me filter-out records that should be too big for the RocksDB JNI bridge. This seems to make our job behave better! Thanks for your help guys, this was really helpful :)
Robin Le sam. 16 mai 2020 à 09:05, Congxian Qiu <[email protected]> a écrit : > Hi > > As you described, I'm not sure whether MapState can help you in such case. > MapState will serializer each <mapKey, mapvalue> separately, so it would > not encounter such the problem as ListState. > > When using MapState, you may need to handle how to set the mapKey, if the > whole state will be cleared after processed, then you can use a monotonous > increment integer as the mapKey, store the upper used mapKey in a value > state. > > > Best, > Congxian > > > Yun Tang <[email protected]> 于2020年5月15日周五 下午10:31写道: > >> Hi Robin >> >> I think you could record the size of you list under currentKey with >> another value state or operator state (store a Map with <key-by key, list >> length>, store the whole map in list when snapshotting). If you do not have >> many key-by keys, operator state is a good choice as that is on-heap and >> lightweight. >> >> Best >> Yun Tang >> ------------------------------ >> *From:* Robin Cassan <[email protected]> >> *Sent:* Friday, May 15, 2020 20:59 >> *To:* Yun Tang <[email protected]> >> *Cc:* user <[email protected]> >> *Subject:* Re: Protection against huge values in RocksDB List State >> >> Hi Yun, thanks for your answer! And sorry I didn't see this limitation >> from the documentation, makes sense! >> In our case, we are merging too many elements (since each element is >> limited to 4Mib in our kafka topic). I agree we do not want our state to >> contain really big values, this is why we are trying to find a way to put a >> limit on the number (or total size) of elements that are aggregated in the >> state of the window. >> We have found a way to do this by using another sessionWindow that is set >> before the other one, which will store the number of messages for each key >> and reject new messages if we have reached a limit, but we are wondering if >> there is a better way to achieve that without creating another state. >> >> Thanks again, >> Robin >> >> Le jeu. 14 mai 2020 à 19:38, Yun Tang <[email protected]> a écrit : >> >> Hi Robin >> >> First of all, the root cause is not RocksDB cannot store large list state >> when you merge but the JNI limitation of 2^31 bytes [1]. >> Moreover, RocksDB java would not return anything when you call merge [2] >> operator. >> >> Did you merge too many elements or just merge too big-size elements? Last >> but not least, even you could merge large list, I think getting a value >> with size larger than 2^31 bytes should not behave well. >> >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend >> [2] >> https://github.com/facebook/rocksdb/blob/50d63a2af01a46dd938dc1b717067339c92da040/java/src/main/java/org/rocksdb/RocksDB.java#L1382 >> >> Best >> Yun Tang >> ------------------------------ >> *From:* Robin Cassan <[email protected]> >> *Sent:* Friday, May 15, 2020 0:37 >> *To:* user <[email protected]> >> *Subject:* Protection against huge values in RocksDB List State >> >> Hi all! >> >> I cannot seem to find any setting to limit the number of records appended >> in a RocksDBListState that is used when we use SessionWindows with a >> ProcessFunction. >> It seems that, for each incoming element, the new element will be >> appended to the value with the RocksDB `merge` operator, without any >> safeguard to make sure that it doesn't grow infinitely. RocksDB merge seems >> to support returning false in case of error, so I guess we could implement >> a limit by returning false in the merge operator, but since Flink seems to >> use the "stringappendtest" merge operator ( >> https://github.com/facebook/rocksdb/blob/fdf882ded218344c136c97daf76dfb59e4bc155f/utilities/merge_operators/string_append/stringappend2.cc >> ), >> we always return true no matter what. >> >> This is troublesome for us because it would make a lot of sense to >> specify an acceptable limit to how many elements can be aggregated under a >> given key, and because when we happen to have too many elements we get an >> exception from RocksDB: >> ``` >> Caused by: org.apache.flink.util.FlinkRuntimeException: Error while >> retrieving data from RocksDB >> at >> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121) >> at >> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111) >> at >> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60) >> at >> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501) >> at >> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) >> at >> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) >> ... 7 more >> Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM >> limit >> at org.rocksdb.RocksDB.get(Native Method) >> at org.rocksdb.RocksDB.get(RocksDB.java:810) >> at >> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118) >> ... 12 more >> ``` >> >> We are currently bypassing this by using a Reduce operator instead, which >> ensures that we only store one element per key, but this gives us degraded >> performance. >> >> Thanks for your input! >> Robin >> >>
