Hi,

ok, now I understand your goal a bit better. If would still like to point out 
that it may take a bit more than it looks like. Just to name one example, you 
probably also want to support asynchronous snapshots which is most likely 
difficult when using just a hashmap. I think the proper solution for you (and 
also something that we are considering to support in the future) is that 
different backends could be supported for different operators in a job. But 
that is currently not possible. I still want to answer your other question: you 
could currently compute all things about key-groups and their assignment to 
operators by using the methods from 
org.apache.flink.runtime.state.KeyGroupRangeAssignment.

Best,
Stefan

> Am 20.02.2018 um 14:52 schrieb Gerard Garcia <ger...@talaia.io>:
> 
> Hi Stefan, thanks 
> 
> Yes, we are also using keyed state in other operators the problem is that 
> serialization is quite expensive and in some of them we would prefer to avoid 
> it by storing the state in memory (for our use case one specific operator 
> with in memory state gives at least a 30% throughput improvement). When we 
> are not operating in a keyed stream is easy, basically all the operators have 
> the same in memory state, what we would like to do is the same but when we 
> are operating in a keyed stream. Does it make more sense now?
> 
> We are using rocksdb as state backend and as far as I know elements get 
> always serialized when stored in the state and I'm not sure if there is even 
> some disk access (maybe not synchronously) that could hurt performance.
> 
> Gerard
> 
> On Tue, Feb 20, 2018 at 2:42 PM, Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> from what I read, I get the impression that you attempt to implement you own 
> "keyed state" with a hashmap? Why not using the keyed state that is already 
> provided by Flink and gives you efficient rescaling etc. out of the box? 
> Please see [1] for the details.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state>
> 
>> Am 20.02.2018 um 13:44 schrieb gerardg <ger...@talaia.io 
>> <mailto:ger...@talaia.io>>:
>> 
>> Hello,
>> 
>> To improve performance we have " keyed state" in the operator's memory,
>> basically we keep a Map which contains the state per each of the keys. The
>> problem comes when we want to restore the state after a failure or after
>> rescaling the operator. What we are doing is sending the concatenation of
>> all the state to every operator using an union redistribution and then we
>> restore the "in memory state" every time we see a new key. Then, after a
>> while, we just clear the redistributed state. This is somewhat complex and
>> prone to errors so we would like to find an alternative way of doing this.
>> 
>> As far as I know Flink knows which keys belong to each operator
>> (distributing key groups) so I guess it would be possible to somehow
>> calculate the key id from each of the stored keys and restore the in memory
>> state at once if we could access to the key groups mapping. Is that
>> possible? We could patch Flink if necessary to access that information. 
>> 
>> Thanks, 
>> 
>> Gerard
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
> 
> 

Reply via email to