Re: Conceptual question

2018-06-12 Thread David Anderson
Tony, You are right; I hadn't thought this through. The KeyedStateFunction only has access to one piece of state at a time, which is the state for some specific key associated with one specific state descriptor. You can fetch, update, or clear that piece of state, but as far as I know, you can't

Re: Conceptual question

2018-06-12 Thread Tony Wei
Hi David, I have read the document for `Context.applyToKeyedState()`, but I still have some questions for using it to implement keyed state migration. `Context.applyToKeyedState()` can only be called in `processBoradcaseElement()`, so it won't have any key information. It looks like I can use

Re: Conceptual question

2018-06-08 Thread TechnoMage
Thank you all. This discussion is very helpful. It sounds like I can wait for 1.6 though given our development status. Michael > On Jun 8, 2018, at 1:08 PM, David Anderson wrote: > > Hi all, > > I think I see a way to eagerly do full state migration without writing your > own Operator,

Re: Conceptual question

2018-06-08 Thread David Anderson
Hi all, I think I see a way to eagerly do full state migration without writing your own Operator, but it's kind of hacky and may have flaws I'm not aware of. In Flink 1.5 we now have the possibility to connect BroadcastStreams to KeyedStreams and apply a KeyedBroadcastProcessFunction. This is

Re: Conceptual question

2018-06-08 Thread Piotr Nowojski
Hi, Yes it should be feasible. As I said before, with Flink 1.6 there will be better way for migrating a state, but for now you either need to lazily convert the state, or iterate over the keys and do the job manually. Piotrek > On 7 Jun 2018, at 15:52, Tony Wei wrote: > > Hi Piotrek, > >

Re: Conceptual question

2018-06-07 Thread Tony Wei
Hi Piotrek, So my question is: is that feasible to migrate state from `ProcessFunction` to my own operator then use `getKeyedStateBackend()` to migrate the states? If yes, is there anything I need to be careful with? If no, why and can it be available in the future? Thank you. Best Regards, Tony

Re: Conceptual question

2018-06-07 Thread Piotr Nowojski
Hi, Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the function and you can not migrate your state that way. As far as I know yes, at the moment in order to convert everything at once (without getKeyes you still can implement lazy conversion) you would have to write your

Re: Conceptual question

2018-06-07 Thread Tony Wei
Hi Piotrek, I used `ProcessFunction` to implement it, but it seems that I can't call `getKeyedStateBackend()` like `WindowOperator` did. I found that `getKeyedStateBackend()` is the method in `AbstractStreamOperator` and `ProcessFunction` API didn't extend it. Dose that mean I can't look up all

Re: Conceptual question

2018-06-07 Thread Piotr Nowojski
What function are you implementing and how are you using it? Usually it’s enough if your function implements RichFunction (or rather extend from AbstractRichFunction) and then you could use RichFunction#open in the similar manner as in the code that I posted in previous message. Flink in many

Re: Conceptual question

2018-06-07 Thread Tony Wei
Hi Piotrek, It seems that this was implemented by `Operator` API, which is a more low level api compared to `Function` API. Since in `Function` API level we can only migrate state by event triggered, it is more convenient in this way to migrate state by foreach all keys in `open()` method. If I

Re: Conceptual question

2018-06-07 Thread Piotr Nowojski
Hi, General solution for state/schema migration is under development and it might be released with Flink 1.6.0. Before that, you need to manually handle the state migration in your operator’s open method. Lets assume that your OperatorV1 has a state field “stateV1”. Your OperatorV2 defines

Conceptual question

2018-06-06 Thread TechnoMage
We are still pretty new to Flink and I have a conceptual / DevOps question. When a job is modified and we want to deploy the new version, what is the preferred method? Our jobs have a lot of keyed state. If we use snapshots we have old state that may no longer apply to the new pipeline. If we