Re: Conceptual question

2018-06-12 Thread David Anderson
Tony,

You are right; I hadn't thought this through. The KeyedStateFunction only
has access to one piece of state at a time, which is the state for some
specific key associated with one specific state descriptor. You can fetch,
update, or clear that piece of state, but as far as I know, you can't
operate with two state descriptors at the same time, so there doesn't
appear to be any way to effect a migration.

Sorry for that,
David

On Tue, Jun 12, 2018 at 9:56 AM, Tony Wei  wrote:

> Hi David,
>
> I have read the document for `Context.applyToKeyedState()`, but I still
> have some questions for using it to implement keyed state migration.
> `Context.applyToKeyedState()` can only be called in
> `processBoradcaseElement()`, so it won't have any key information.
> It looks like I can use `KeyedStateFunction` to get, update or clear my
> keyed states. Am I right?
> If I want to migrate to different type, e.g. change `string` type to `int`
> type, how do I archive by using this functionality?
> It seems that I can't use `key` parameter in `KeyedStateFunction` to
> access the other state, generated by another state descriptor.
> Please correct me if I misunderstood. Thank you.
>
> Best Regards,
> Tony Wei
>
>
>
> 2018-06-09 9:45 GMT+08:00 TechnoMage :
>
>> Thank you all.  This discussion is very helpful.  It sounds like I can
>> wait for 1.6 though given our development status.
>>
>> Michael
>>
>>
>> On Jun 8, 2018, at 1:08 PM, David Anderson 
>> wrote:
>>
>> Hi all,
>>
>> I think I see a way to eagerly do full state migration without writing
>> your own Operator, but it's kind of hacky and may have flaws I'm not aware
>> of.
>>
>> In Flink 1.5 we now have the possibility to connect BroadcastStreams to
>> KeyedStreams and apply a KeyedBroadcastProcessFunction. This is relevant
>> because in the processBroadcastElement() method you can supply a
>> KeyedStateFunction to the Context.applyToKeyedState() method, and this 
>> KeyedStateFunction
>> will be applied every item of keyed state associated with the state
>> descriptor you specify. I've been doing some experiments with this, and
>> it's quite powerful in cases where it's useful to operate on all of your
>> application's state.
>>
>> I believe this was intended for cases where an update to an item of
>> broadcast state has implications for associated keyed state, but I see
>> nothing that prevents you from essentially ignoring the broadcast stream
>> and using this mechanism to implement keyed state migration.
>>
>> David
>>
>>
>>
>> On Fri, Jun 8, 2018 at 9:27 AM, Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> Yes it should be feasible. As I said before, with Flink 1.6 there will
>>> be better way for migrating a state, but for now you either need to lazily
>>> convert the state, or iterate over the keys and do the job manually.
>>>
>>> Piotrek
>>>
>>>
>>> On 7 Jun 2018, at 15:52, Tony Wei  wrote:
>>>
>>> Hi Piotrek,
>>>
>>> So my question is: is that feasible to migrate state from
>>> `ProcessFunction` to my own operator then use `getKeyedStateBackend()` to
>>> migrate the states?
>>> If yes, is there anything I need to be careful with? If no, why and can
>>> it be available in the future? Thank you.
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski :
>>>
 Hi,

 Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the
 function and you can not migrate your state that way.

 As far as I know yes, at the moment in order to convert everything at
 once (without getKeyes you still can implement lazy conversion) you would
 have to write your own operator.

 Piotrek


 On 7 Jun 2018, at 15:26, Tony Wei  wrote:

 Hi Piotrek,

 I used `ProcessFunction` to implement it, but it seems that I can't
 call `getKeyedStateBackend()` like `WindowOperator` did.
 I found that `getKeyedStateBackend()` is the method in
 `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
 Dose that mean I can't look up all keys and migrate the entire previous
 states to the new states in `ProcessFunction#open()`?
 As I said, do I need to port `ProcessFunction` to
 `KeyedProcessOperator` to migration state like the manner showed in
 `WindowOperator`?

 Best Regards,
 Tony Wei

 2018-06-07 20:28 GMT+08:00 Piotr Nowojski :

> What function are you implementing and how are you using it?
>
> Usually it’s enough if your function implements RichFunction (or
> rather extend from AbstractRichFunction) and then you could use
> RichFunction#open in the similar manner as in the code that I posted in
> previous message. Flink in many places performs instanceof chekcs like:
> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
>
> public static void openFunction(Function fun
> ction, Configuration parameters) throws Exception{
>if (function instanceof 

Re: Conceptual question

2018-06-12 Thread Tony Wei
Hi David,

I have read the document for `Context.applyToKeyedState()`, but I still
have some questions for using it to implement keyed state migration.
`Context.applyToKeyedState()` can only be called in
`processBoradcaseElement()`, so it won't have any key information.
It looks like I can use `KeyedStateFunction` to get, update or clear my
keyed states. Am I right?
If I want to migrate to different type, e.g. change `string` type to `int`
type, how do I archive by using this functionality?
It seems that I can't use `key` parameter in `KeyedStateFunction` to access
the other state, generated by another state descriptor.
Please correct me if I misunderstood. Thank you.

Best Regards,
Tony Wei



2018-06-09 9:45 GMT+08:00 TechnoMage :

> Thank you all.  This discussion is very helpful.  It sounds like I can
> wait for 1.6 though given our development status.
>
> Michael
>
>
> On Jun 8, 2018, at 1:08 PM, David Anderson 
> wrote:
>
> Hi all,
>
> I think I see a way to eagerly do full state migration without writing
> your own Operator, but it's kind of hacky and may have flaws I'm not aware
> of.
>
> In Flink 1.5 we now have the possibility to connect BroadcastStreams to
> KeyedStreams and apply a KeyedBroadcastProcessFunction. This is relevant
> because in the processBroadcastElement() method you can supply a
> KeyedStateFunction to the Context.applyToKeyedState() method, and this 
> KeyedStateFunction
> will be applied every item of keyed state associated with the state
> descriptor you specify. I've been doing some experiments with this, and
> it's quite powerful in cases where it's useful to operate on all of your
> application's state.
>
> I believe this was intended for cases where an update to an item of
> broadcast state has implications for associated keyed state, but I see
> nothing that prevents you from essentially ignoring the broadcast stream
> and using this mechanism to implement keyed state migration.
>
> David
>
>
>
> On Fri, Jun 8, 2018 at 9:27 AM, Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Yes it should be feasible. As I said before, with Flink 1.6 there will be
>> better way for migrating a state, but for now you either need to lazily
>> convert the state, or iterate over the keys and do the job manually.
>>
>> Piotrek
>>
>>
>> On 7 Jun 2018, at 15:52, Tony Wei  wrote:
>>
>> Hi Piotrek,
>>
>> So my question is: is that feasible to migrate state from
>> `ProcessFunction` to my own operator then use `getKeyedStateBackend()` to
>> migrate the states?
>> If yes, is there anything I need to be careful with? If no, why and can
>> it be available in the future? Thank you.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski :
>>
>>> Hi,
>>>
>>> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the
>>> function and you can not migrate your state that way.
>>>
>>> As far as I know yes, at the moment in order to convert everything at
>>> once (without getKeyes you still can implement lazy conversion) you would
>>> have to write your own operator.
>>>
>>> Piotrek
>>>
>>>
>>> On 7 Jun 2018, at 15:26, Tony Wei  wrote:
>>>
>>> Hi Piotrek,
>>>
>>> I used `ProcessFunction` to implement it, but it seems that I can't call
>>> `getKeyedStateBackend()` like `WindowOperator` did.
>>> I found that `getKeyedStateBackend()` is the method in
>>> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
>>> Dose that mean I can't look up all keys and migrate the entire previous
>>> states to the new states in `ProcessFunction#open()`?
>>> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator`
>>> to migration state like the manner showed in `WindowOperator`?
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski :
>>>
 What function are you implementing and how are you using it?

 Usually it’s enough if your function implements RichFunction (or rather
 extend from AbstractRichFunction) and then you could use RichFunction#open
 in the similar manner as in the code that I posted in previous message.
 Flink in many places performs instanceof chekcs like:
 org.apache.flink.api.common.functions.util.FunctionUtils#openFunction

 public static void openFunction(Function fun
 ction, Configuration parameters) throws Exception{
if (function instanceof RichFunction) {
   RichFunction richFunction = (RichFunction) function;
   richFunction.open(parameters);
}
 }

 Piotrek


 On 7 Jun 2018, at 11:07, Tony Wei  wrote:

 Hi Piotrek,

 It seems that this was implemented by `Operator` API, which is a more
 low level api compared to `Function` API.
 Since in `Function` API level we can only migrate state by event
 triggered, it is more convenient in this way to migrate state by foreach
 all keys in `open()` method.
 If I was implemented state operator by `ProcessFunction` API, is it
 possible to 

Re: Conceptual question

2018-06-08 Thread TechnoMage
Thank you all.  This discussion is very helpful.  It sounds like I can wait for 
1.6 though given our development status.

Michael

> On Jun 8, 2018, at 1:08 PM, David Anderson  wrote:
> 
> Hi all,
> 
> I think I see a way to eagerly do full state migration without writing your 
> own Operator, but it's kind of hacky and may have flaws I'm not aware of. 
> 
> In Flink 1.5 we now have the possibility to connect BroadcastStreams to 
> KeyedStreams and apply a KeyedBroadcastProcessFunction. This is relevant 
> because in the processBroadcastElement() method you can supply a 
> KeyedStateFunction to the Context.applyToKeyedState() method, and this 
> KeyedStateFunction will be applied every item of keyed state associated with 
> the state descriptor you specify. I've been doing some experiments with this, 
> and it's quite powerful in cases where it's useful to operate on all of your 
> application's state.
> 
> I believe this was intended for cases where an update to an item of broadcast 
> state has implications for associated keyed state, but I see nothing that 
> prevents you from essentially ignoring the broadcast stream and using this 
> mechanism to implement keyed state migration.
> 
> David
> 
> 
> 
> On Fri, Jun 8, 2018 at 9:27 AM, Piotr Nowojski  > wrote:
> Hi,
> 
> Yes it should be feasible. As I said before, with Flink 1.6 there will be 
> better way for migrating a state, but for now you either need to lazily 
> convert the state, or iterate over the keys and do the job manually.
> 
> Piotrek
> 
> 
>> On 7 Jun 2018, at 15:52, Tony Wei > > wrote:
>> 
>> Hi Piotrek,
>> 
>> So my question is: is that feasible to migrate state from `ProcessFunction` 
>> to my own operator then use `getKeyedStateBackend()` to migrate the states?
>> If yes, is there anything I need to be careful with? If no, why and can it 
>> be available in the future? Thank you.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski > >:
>> Hi,
>> 
>> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the 
>> function and you can not migrate your state that way.
>> 
>> As far as I know yes, at the moment in order to convert everything at once 
>> (without getKeyes you still can implement lazy conversion) you would have to 
>> write your own operator.
>> 
>> Piotrek
>> 
>> 
>>> On 7 Jun 2018, at 15:26, Tony Wei >> > wrote:
>>> 
>>> Hi Piotrek,
>>> 
>>> I used `ProcessFunction` to implement it, but it seems that I can't call 
>>> `getKeyedStateBackend()` like `WindowOperator` did.
>>> I found that `getKeyedStateBackend()` is the method in 
>>> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
>>> Dose that mean I can't look up all keys and migrate the entire previous 
>>> states to the new states in `ProcessFunction#open()`?
>>> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to 
>>> migration state like the manner showed in `WindowOperator`? 
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski >> >:
>>> What function are you implementing and how are you using it?
>>> 
>>> Usually it’s enough if your function implements RichFunction (or rather 
>>> extend from AbstractRichFunction) and then you could use RichFunction#open 
>>> in the similar manner as in the code that I posted in previous message. 
>>> Flink in many places performs instanceof chekcs like: 
>>> org.apache.flink.api.com 
>>> mon.functions.util.FunctionUtils#openFunction
>>> 
>>> public static void openFunction(Function function, Configuration 
>>> parameters) throws Exception{
>>>if (function instanceof RichFunction) {
>>>   RichFunction richFunction = (RichFunction) function;
>>>   richFunction.open(parameters);
>>>}
>>> }
>>> 
>>> Piotrek
>>> 
>>> 
 On 7 Jun 2018, at 11:07, Tony Wei >>> > wrote:
 
 Hi Piotrek,
 
 It seems that this was implemented by `Operator` API, which is a more low 
 level api compared to `Function` API.
 Since in `Function` API level we can only migrate state by event 
 triggered, it is more convenient in this way to migrate state by foreach 
 all keys in `open()` method.
 If I was implemented state operator by `ProcessFunction` API, is it 
 possible to port it to `KeyedProcessOperator` and do the state migration 
 that you mentioned?
 And are there something concerned and difficulties that will leads to 
 restored state failed or other problems? Thank you!
 
 Best Regards,
 Tony Wei
 
 2018-06-07 16:10 GMT+08:00 Piotr Nowojski >>> >:
 Hi,
 
 General solution for state/schema migration is under development and it 
 might be released with Flink 1.6.0.
 
 Before 

Re: Conceptual question

2018-06-08 Thread David Anderson
Hi all,

I think I see a way to eagerly do full state migration without writing your
own Operator, but it's kind of hacky and may have flaws I'm not aware of.

In Flink 1.5 we now have the possibility to connect BroadcastStreams to
KeyedStreams and apply a KeyedBroadcastProcessFunction. This is relevant
because in the processBroadcastElement() method you can supply a
KeyedStateFunction to the Context.applyToKeyedState() method, and this
KeyedStateFunction
will be applied every item of keyed state associated with the state
descriptor you specify. I've been doing some experiments with this, and
it's quite powerful in cases where it's useful to operate on all of your
application's state.

I believe this was intended for cases where an update to an item of
broadcast state has implications for associated keyed state, but I see
nothing that prevents you from essentially ignoring the broadcast stream
and using this mechanism to implement keyed state migration.

David



On Fri, Jun 8, 2018 at 9:27 AM, Piotr Nowojski 
wrote:

> Hi,
>
> Yes it should be feasible. As I said before, with Flink 1.6 there will be
> better way for migrating a state, but for now you either need to lazily
> convert the state, or iterate over the keys and do the job manually.
>
> Piotrek
>
>
> On 7 Jun 2018, at 15:52, Tony Wei  wrote:
>
> Hi Piotrek,
>
> So my question is: is that feasible to migrate state from
> `ProcessFunction` to my own operator then use `getKeyedStateBackend()` to
> migrate the states?
> If yes, is there anything I need to be careful with? If no, why and can it
> be available in the future? Thank you.
>
> Best Regards,
> Tony Wei
>
> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski :
>
>> Hi,
>>
>> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the
>> function and you can not migrate your state that way.
>>
>> As far as I know yes, at the moment in order to convert everything at
>> once (without getKeyes you still can implement lazy conversion) you would
>> have to write your own operator.
>>
>> Piotrek
>>
>>
>> On 7 Jun 2018, at 15:26, Tony Wei  wrote:
>>
>> Hi Piotrek,
>>
>> I used `ProcessFunction` to implement it, but it seems that I can't call
>> `getKeyedStateBackend()` like `WindowOperator` did.
>> I found that `getKeyedStateBackend()` is the method in
>> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
>> Dose that mean I can't look up all keys and migrate the entire previous
>> states to the new states in `ProcessFunction#open()`?
>> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator`
>> to migration state like the manner showed in `WindowOperator`?
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski :
>>
>>> What function are you implementing and how are you using it?
>>>
>>> Usually it’s enough if your function implements RichFunction (or rather
>>> extend from AbstractRichFunction) and then you could use RichFunction#open
>>> in the similar manner as in the code that I posted in previous message.
>>> Flink in many places performs instanceof chekcs like:
>>> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
>>>
>>> public static void openFunction(Function fun
>>> ction, Configuration parameters) throws Exception{
>>>if (function instanceof RichFunction) {
>>>   RichFunction richFunction = (RichFunction) function;
>>>   richFunction.open(parameters);
>>>}
>>> }
>>>
>>> Piotrek
>>>
>>>
>>> On 7 Jun 2018, at 11:07, Tony Wei  wrote:
>>>
>>> Hi Piotrek,
>>>
>>> It seems that this was implemented by `Operator` API, which is a more
>>> low level api compared to `Function` API.
>>> Since in `Function` API level we can only migrate state by event
>>> triggered, it is more convenient in this way to migrate state by foreach
>>> all keys in `open()` method.
>>> If I was implemented state operator by `ProcessFunction` API, is it
>>> possible to port it to `KeyedProcessOperator` and do the state migration
>>> that you mentioned?
>>> And are there something concerned and difficulties that will leads to
>>> restored state failed or other problems? Thank you!
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski :
>>>
 Hi,

 General solution for state/schema migration is under development and it
 might be released with Flink 1.6.0.

 Before that, you need to manually handle the state migration in your
 operator’s open method. Lets assume that your OperatorV1 has a state field
 “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible
 with previous version. What you can do, is to add a logic in open method,
 to check:
 1. If “stateV2” is non empty, do nothing
 2. If there is no “stateV2”, iterate over all of the keys and manually
 migrate “stateV1” to “stateV2”

 In your OperatorV3 you could drop the support for “stateV1”.

 I have once implemented something like that here:

 

Re: Conceptual question

2018-06-08 Thread Piotr Nowojski
Hi,

Yes it should be feasible. As I said before, with Flink 1.6 there will be 
better way for migrating a state, but for now you either need to lazily convert 
the state, or iterate over the keys and do the job manually.

Piotrek

> On 7 Jun 2018, at 15:52, Tony Wei  wrote:
> 
> Hi Piotrek,
> 
> So my question is: is that feasible to migrate state from `ProcessFunction` 
> to my own operator then use `getKeyedStateBackend()` to migrate the states?
> If yes, is there anything I need to be careful with? If no, why and can it be 
> available in the future? Thank you.
> 
> Best Regards,
> Tony Wei
> 
> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski  >:
> Hi,
> 
> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the 
> function and you can not migrate your state that way.
> 
> As far as I know yes, at the moment in order to convert everything at once 
> (without getKeyes you still can implement lazy conversion) you would have to 
> write your own operator.
> 
> Piotrek
> 
> 
>> On 7 Jun 2018, at 15:26, Tony Wei > > wrote:
>> 
>> Hi Piotrek,
>> 
>> I used `ProcessFunction` to implement it, but it seems that I can't call 
>> `getKeyedStateBackend()` like `WindowOperator` did.
>> I found that `getKeyedStateBackend()` is the method in 
>> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
>> Dose that mean I can't look up all keys and migrate the entire previous 
>> states to the new states in `ProcessFunction#open()`?
>> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to 
>> migration state like the manner showed in `WindowOperator`? 
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski > >:
>> What function are you implementing and how are you using it?
>> 
>> Usually it’s enough if your function implements RichFunction (or rather 
>> extend from AbstractRichFunction) and then you could use RichFunction#open 
>> in the similar manner as in the code that I posted in previous message. 
>> Flink in many places performs instanceof chekcs like: 
>> org.apache.flink.api.com 
>> mon.functions.util.FunctionUtils#openFunction
>> 
>> public static void openFunction(Function function, Configuration parameters) 
>> throws Exception{
>>if (function instanceof RichFunction) {
>>   RichFunction richFunction = (RichFunction) function;
>>   richFunction.open(parameters);
>>}
>> }
>> 
>> Piotrek
>> 
>> 
>>> On 7 Jun 2018, at 11:07, Tony Wei >> > wrote:
>>> 
>>> Hi Piotrek,
>>> 
>>> It seems that this was implemented by `Operator` API, which is a more low 
>>> level api compared to `Function` API.
>>> Since in `Function` API level we can only migrate state by event triggered, 
>>> it is more convenient in this way to migrate state by foreach all keys in 
>>> `open()` method.
>>> If I was implemented state operator by `ProcessFunction` API, is it 
>>> possible to port it to `KeyedProcessOperator` and do the state migration 
>>> that you mentioned?
>>> And are there something concerned and difficulties that will leads to 
>>> restored state failed or other problems? Thank you!
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski >> >:
>>> Hi,
>>> 
>>> General solution for state/schema migration is under development and it 
>>> might be released with Flink 1.6.0.
>>> 
>>> Before that, you need to manually handle the state migration in your 
>>> operator’s open method. Lets assume that your OperatorV1 has a state field 
>>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible 
>>> with previous version. What you can do, is to add a logic in open method, 
>>> to check:
>>> 1. If “stateV2” is non empty, do nothing
>>> 2. If there is no “stateV2”, iterate over all of the keys and manually 
>>> migrate “stateV1” to “stateV2”
>>> 
>>> In your OperatorV3 you could drop the support for “stateV1”.
>>> 
>>> I have once implemented something like that here:
>>> 
>>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
>>>  
>>> 
>>> 
>>> Hope that helps!
>>> 
>>> Piotrek
>>> 
>>> 
 On 6 Jun 2018, at 17:04, TechnoMage >>> > wrote:
 
 We are still pretty new to Flink and I have a conceptual / DevOps question.
 
 When a job is modified and we want to deploy the new version, what is the 
 preferred method?  Our jobs have a lot of keyed state.
 
 If we use snapshots we have old state that may no longer apply to 

Re: Conceptual question

2018-06-07 Thread Tony Wei
Hi Piotrek,

So my question is: is that feasible to migrate state from `ProcessFunction`
to my own operator then use `getKeyedStateBackend()` to migrate the states?
If yes, is there anything I need to be careful with? If no, why and can it
be available in the future? Thank you.

Best Regards,
Tony Wei

2018-06-07 21:43 GMT+08:00 Piotr Nowojski :

> Hi,
>
> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the
> function and you can not migrate your state that way.
>
> As far as I know yes, at the moment in order to convert everything at once
> (without getKeyes you still can implement lazy conversion) you would have
> to write your own operator.
>
> Piotrek
>
>
> On 7 Jun 2018, at 15:26, Tony Wei  wrote:
>
> Hi Piotrek,
>
> I used `ProcessFunction` to implement it, but it seems that I can't call
> `getKeyedStateBackend()` like `WindowOperator` did.
> I found that `getKeyedStateBackend()` is the method in
> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
> Dose that mean I can't look up all keys and migrate the entire previous
> states to the new states in `ProcessFunction#open()`?
> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator`
> to migration state like the manner showed in `WindowOperator`?
>
> Best Regards,
> Tony Wei
>
> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski :
>
>> What function are you implementing and how are you using it?
>>
>> Usually it’s enough if your function implements RichFunction (or rather
>> extend from AbstractRichFunction) and then you could use RichFunction#open
>> in the similar manner as in the code that I posted in previous message.
>> Flink in many places performs instanceof chekcs like:
>> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
>>
>> public static void openFunction(Function fun
>> ction, Configuration parameters) throws Exception{
>>if (function instanceof RichFunction) {
>>   RichFunction richFunction = (RichFunction) function;
>>   richFunction.open(parameters);
>>}
>> }
>>
>> Piotrek
>>
>>
>> On 7 Jun 2018, at 11:07, Tony Wei  wrote:
>>
>> Hi Piotrek,
>>
>> It seems that this was implemented by `Operator` API, which is a more low
>> level api compared to `Function` API.
>> Since in `Function` API level we can only migrate state by event
>> triggered, it is more convenient in this way to migrate state by foreach
>> all keys in `open()` method.
>> If I was implemented state operator by `ProcessFunction` API, is it
>> possible to port it to `KeyedProcessOperator` and do the state migration
>> that you mentioned?
>> And are there something concerned and difficulties that will leads to
>> restored state failed or other problems? Thank you!
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski :
>>
>>> Hi,
>>>
>>> General solution for state/schema migration is under development and it
>>> might be released with Flink 1.6.0.
>>>
>>> Before that, you need to manually handle the state migration in your
>>> operator’s open method. Lets assume that your OperatorV1 has a state field
>>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible
>>> with previous version. What you can do, is to add a logic in open method,
>>> to check:
>>> 1. If “stateV2” is non empty, do nothing
>>> 2. If there is no “stateV2”, iterate over all of the keys and manually
>>> migrate “stateV1” to “stateV2”
>>>
>>> In your OperatorV3 you could drop the support for “stateV1”.
>>>
>>> I have once implemented something like that here:
>>>
>>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7
>>> acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/
>>> apache/flink/streaming/runtime/operators/windowing/WindowOpe
>>> rator.java#L258
>>>
>>> Hope that helps!
>>>
>>> Piotrek
>>>
>>>
>>> On 6 Jun 2018, at 17:04, TechnoMage  wrote:
>>>
>>> We are still pretty new to Flink and I have a conceptual / DevOps
>>> question.
>>>
>>> When a job is modified and we want to deploy the new version, what is
>>> the preferred method?  Our jobs have a lot of keyed state.
>>>
>>> If we use snapshots we have old state that may no longer apply to the
>>> new pipeline.
>>> If we start a new job we can reprocess historical data from Kafka, but
>>> that can be very resource heavy for a while.
>>>
>>> Is there an option I am missing?  Are there facilities to “patch” or
>>> “purge” selectively the keyed state?
>>>
>>> Michael
>>>
>>>
>>>
>>
>>
>
>


Re: Conceptual question

2018-06-07 Thread Piotr Nowojski
Hi,

Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the 
function and you can not migrate your state that way.

As far as I know yes, at the moment in order to convert everything at once 
(without getKeyes you still can implement lazy conversion) you would have to 
write your own operator.

Piotrek

> On 7 Jun 2018, at 15:26, Tony Wei  wrote:
> 
> Hi Piotrek,
> 
> I used `ProcessFunction` to implement it, but it seems that I can't call 
> `getKeyedStateBackend()` like `WindowOperator` did.
> I found that `getKeyedStateBackend()` is the method in 
> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
> Dose that mean I can't look up all keys and migrate the entire previous 
> states to the new states in `ProcessFunction#open()`?
> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to 
> migration state like the manner showed in `WindowOperator`? 
> 
> Best Regards,
> Tony Wei
> 
> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski  >:
> What function are you implementing and how are you using it?
> 
> Usually it’s enough if your function implements RichFunction (or rather 
> extend from AbstractRichFunction) and then you could use RichFunction#open in 
> the similar manner as in the code that I posted in previous message. Flink in 
> many places performs instanceof chekcs like: 
> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
> 
> public static void openFunction(Function function, Configuration parameters) 
> throws Exception{
>if (function instanceof RichFunction) {
>   RichFunction richFunction = (RichFunction) function;
>   richFunction.open(parameters);
>}
> }
> 
> Piotrek
> 
> 
>> On 7 Jun 2018, at 11:07, Tony Wei > > wrote:
>> 
>> Hi Piotrek,
>> 
>> It seems that this was implemented by `Operator` API, which is a more low 
>> level api compared to `Function` API.
>> Since in `Function` API level we can only migrate state by event triggered, 
>> it is more convenient in this way to migrate state by foreach all keys in 
>> `open()` method.
>> If I was implemented state operator by `ProcessFunction` API, is it possible 
>> to port it to `KeyedProcessOperator` and do the state migration that you 
>> mentioned?
>> And are there something concerned and difficulties that will leads to 
>> restored state failed or other problems? Thank you!
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski > >:
>> Hi,
>> 
>> General solution for state/schema migration is under development and it 
>> might be released with Flink 1.6.0.
>> 
>> Before that, you need to manually handle the state migration in your 
>> operator’s open method. Lets assume that your OperatorV1 has a state field 
>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible 
>> with previous version. What you can do, is to add a logic in open method, to 
>> check:
>> 1. If “stateV2” is non empty, do nothing
>> 2. If there is no “stateV2”, iterate over all of the keys and manually 
>> migrate “stateV1” to “stateV2”
>> 
>> In your OperatorV3 you could drop the support for “stateV1”.
>> 
>> I have once implemented something like that here:
>> 
>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
>>  
>> 
>> 
>> Hope that helps!
>> 
>> Piotrek
>> 
>> 
>>> On 6 Jun 2018, at 17:04, TechnoMage >> > wrote:
>>> 
>>> We are still pretty new to Flink and I have a conceptual / DevOps question.
>>> 
>>> When a job is modified and we want to deploy the new version, what is the 
>>> preferred method?  Our jobs have a lot of keyed state.
>>> 
>>> If we use snapshots we have old state that may no longer apply to the new 
>>> pipeline.
>>> If we start a new job we can reprocess historical data from Kafka, but that 
>>> can be very resource heavy for a while.
>>> 
>>> Is there an option I am missing?  Are there facilities to “patch” or 
>>> “purge” selectively the keyed state?
>>> 
>>> Michael
>> 
>> 
> 
> 



Re: Conceptual question

2018-06-07 Thread Tony Wei
Hi Piotrek,

I used `ProcessFunction` to implement it, but it seems that I can't call
`getKeyedStateBackend()` like `WindowOperator` did.
I found that `getKeyedStateBackend()` is the method in
`AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
Dose that mean I can't look up all keys and migrate the entire previous
states to the new states in `ProcessFunction#open()`?
As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to
migration state like the manner showed in `WindowOperator`?

Best Regards,
Tony Wei

2018-06-07 20:28 GMT+08:00 Piotr Nowojski :

> What function are you implementing and how are you using it?
>
> Usually it’s enough if your function implements RichFunction (or rather
> extend from AbstractRichFunction) and then you could use RichFunction#open
> in the similar manner as in the code that I posted in previous message.
> Flink in many places performs instanceof chekcs like: org.apache.flink.api.
> common.functions.util.FunctionUtils#openFunction
>
> public static void openFunction(Function function, Configuration
> parameters) throws Exception{
>if (function instanceof RichFunction) {
>   RichFunction richFunction = (RichFunction) function;
>   richFunction.open(parameters);
>}
> }
>
> Piotrek
>
>
> On 7 Jun 2018, at 11:07, Tony Wei  wrote:
>
> Hi Piotrek,
>
> It seems that this was implemented by `Operator` API, which is a more low
> level api compared to `Function` API.
> Since in `Function` API level we can only migrate state by event
> triggered, it is more convenient in this way to migrate state by foreach
> all keys in `open()` method.
> If I was implemented state operator by `ProcessFunction` API, is it
> possible to port it to `KeyedProcessOperator` and do the state migration
> that you mentioned?
> And are there something concerned and difficulties that will leads to
> restored state failed or other problems? Thank you!
>
> Best Regards,
> Tony Wei
>
> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski :
>
>> Hi,
>>
>> General solution for state/schema migration is under development and it
>> might be released with Flink 1.6.0.
>>
>> Before that, you need to manually handle the state migration in your
>> operator’s open method. Lets assume that your OperatorV1 has a state field
>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible
>> with previous version. What you can do, is to add a logic in open method,
>> to check:
>> 1. If “stateV2” is non empty, do nothing
>> 2. If there is no “stateV2”, iterate over all of the keys and manually
>> migrate “stateV1” to “stateV2”
>>
>> In your OperatorV3 you could drop the support for “stateV1”.
>>
>> I have once implemented something like that here:
>>
>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7
>> acd03cb3f95c000926b2/flink-streaming-java/src/main/java/
>> org/apache/flink/streaming/runtime/operators/windowing/Wi
>> ndowOperator.java#L258
>>
>> Hope that helps!
>>
>> Piotrek
>>
>>
>> On 6 Jun 2018, at 17:04, TechnoMage  wrote:
>>
>> We are still pretty new to Flink and I have a conceptual / DevOps
>> question.
>>
>> When a job is modified and we want to deploy the new version, what is the
>> preferred method?  Our jobs have a lot of keyed state.
>>
>> If we use snapshots we have old state that may no longer apply to the new
>> pipeline.
>> If we start a new job we can reprocess historical data from Kafka, but
>> that can be very resource heavy for a while.
>>
>> Is there an option I am missing?  Are there facilities to “patch” or
>> “purge” selectively the keyed state?
>>
>> Michael
>>
>>
>>
>
>


Re: Conceptual question

2018-06-07 Thread Piotr Nowojski
What function are you implementing and how are you using it?

Usually it’s enough if your function implements RichFunction (or rather extend 
from AbstractRichFunction) and then you could use RichFunction#open in the 
similar manner as in the code that I posted in previous message. Flink in many 
places performs instanceof chekcs like: 
org.apache.flink.api.common.functions.util.FunctionUtils#openFunction

public static void openFunction(Function function, Configuration parameters) 
throws Exception{
   if (function instanceof RichFunction) {
  RichFunction richFunction = (RichFunction) function;
  richFunction.open(parameters);
   }
}

Piotrek

> On 7 Jun 2018, at 11:07, Tony Wei  wrote:
> 
> Hi Piotrek,
> 
> It seems that this was implemented by `Operator` API, which is a more low 
> level api compared to `Function` API.
> Since in `Function` API level we can only migrate state by event triggered, 
> it is more convenient in this way to migrate state by foreach all keys in 
> `open()` method.
> If I was implemented state operator by `ProcessFunction` API, is it possible 
> to port it to `KeyedProcessOperator` and do the state migration that you 
> mentioned?
> And are there something concerned and difficulties that will leads to 
> restored state failed or other problems? Thank you!
> 
> Best Regards,
> Tony Wei
> 
> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski  >:
> Hi,
> 
> General solution for state/schema migration is under development and it might 
> be released with Flink 1.6.0.
> 
> Before that, you need to manually handle the state migration in your 
> operator’s open method. Lets assume that your OperatorV1 has a state field 
> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible 
> with previous version. What you can do, is to add a logic in open method, to 
> check:
> 1. If “stateV2” is non empty, do nothing
> 2. If there is no “stateV2”, iterate over all of the keys and manually 
> migrate “stateV1” to “stateV2”
> 
> In your OperatorV3 you could drop the support for “stateV1”.
> 
> I have once implemented something like that here:
> 
> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
>  
> 
> 
> Hope that helps!
> 
> Piotrek
> 
> 
>> On 6 Jun 2018, at 17:04, TechnoMage > > wrote:
>> 
>> We are still pretty new to Flink and I have a conceptual / DevOps question.
>> 
>> When a job is modified and we want to deploy the new version, what is the 
>> preferred method?  Our jobs have a lot of keyed state.
>> 
>> If we use snapshots we have old state that may no longer apply to the new 
>> pipeline.
>> If we start a new job we can reprocess historical data from Kafka, but that 
>> can be very resource heavy for a while.
>> 
>> Is there an option I am missing?  Are there facilities to “patch” or “purge” 
>> selectively the keyed state?
>> 
>> Michael
> 
> 



Re: Conceptual question

2018-06-07 Thread Tony Wei
Hi Piotrek,

It seems that this was implemented by `Operator` API, which is a more low
level api compared to `Function` API.
Since in `Function` API level we can only migrate state by event triggered,
it is more convenient in this way to migrate state by foreach all keys in
`open()` method.
If I was implemented state operator by `ProcessFunction` API, is it
possible to port it to `KeyedProcessOperator` and do the state migration
that you mentioned?
And are there something concerned and difficulties that will leads to
restored state failed or other problems? Thank you!

Best Regards,
Tony Wei

2018-06-07 16:10 GMT+08:00 Piotr Nowojski :

> Hi,
>
> General solution for state/schema migration is under development and it
> might be released with Flink 1.6.0.
>
> Before that, you need to manually handle the state migration in your
> operator’s open method. Lets assume that your OperatorV1 has a state field
> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible
> with previous version. What you can do, is to add a logic in open method,
> to check:
> 1. If “stateV2” is non empty, do nothing
> 2. If there is no “stateV2”, iterate over all of the keys and manually
> migrate “stateV1” to “stateV2”
>
> In your OperatorV3 you could drop the support for “stateV1”.
>
> I have once implemented something like that here:
>
> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f9
> 5c000926b2/flink-streaming-java/src/main/java/org/apache/
> flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
>
> Hope that helps!
>
> Piotrek
>
>
> On 6 Jun 2018, at 17:04, TechnoMage  wrote:
>
> We are still pretty new to Flink and I have a conceptual / DevOps question.
>
> When a job is modified and we want to deploy the new version, what is the
> preferred method?  Our jobs have a lot of keyed state.
>
> If we use snapshots we have old state that may no longer apply to the new
> pipeline.
> If we start a new job we can reprocess historical data from Kafka, but
> that can be very resource heavy for a while.
>
> Is there an option I am missing?  Are there facilities to “patch” or
> “purge” selectively the keyed state?
>
> Michael
>
>
>


Re: Conceptual question

2018-06-07 Thread Piotr Nowojski
Hi,

General solution for state/schema migration is under development and it might 
be released with Flink 1.6.0.

Before that, you need to manually handle the state migration in your operator’s 
open method. Lets assume that your OperatorV1 has a state field “stateV1”. Your 
OperatorV2 defines field “stateV2”, which is incompatible with previous 
version. What you can do, is to add a logic in open method, to check:
1. If “stateV2” is non empty, do nothing
2. If there is no “stateV2”, iterate over all of the keys and manually migrate 
“stateV1” to “stateV2”

In your OperatorV3 you could drop the support for “stateV1”.

I have once implemented something like that here:

https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
 


Hope that helps!

Piotrek

> On 6 Jun 2018, at 17:04, TechnoMage  wrote:
> 
> We are still pretty new to Flink and I have a conceptual / DevOps question.
> 
> When a job is modified and we want to deploy the new version, what is the 
> preferred method?  Our jobs have a lot of keyed state.
> 
> If we use snapshots we have old state that may no longer apply to the new 
> pipeline.
> If we start a new job we can reprocess historical data from Kafka, but that 
> can be very resource heavy for a while.
> 
> Is there an option I am missing?  Are there facilities to “patch” or “purge” 
> selectively the keyed state?
> 
> Michael