Re: Modify savepoints in Flink

2022-10-21 Thread Sriram Ganesh
Thanks, I'll check it out.

On Fri, Oct 21, 2022, 18:20 Piotr Nowojski  wrote:

> Hi,
>
> Yes and no. StateProcessor API can read any Flink state, but you have to
> describe the state you want it to access. Take a look at the example in the
> docs [1].
>
> First you have an example of a theoretical production function
> `StatefulFunctionWithTime`, which state you want to modify. Note the
> `ValueState` and `ListState` fields and their descriptors. That's the state
> of that particular function. Descriptors determine how the state is
> serialised. Usually they are pretty simple.
> Below is the `ReaderFunction`, that you want to use to access/modify the
> state via the StateProcessor API. To do so, you have to specify the state
> you want to access and effectively mimic/copy paste the state descriptors
> from the production code.
>
> If you want to modify the state of a source/sink function, you would have
> to first take a look into the source code of such a connector to know what
> to modify and copy its descriptors. Also note that for source/sink the
> state is most likely non-keyed.
>
> Best,
> Piotrek
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/#keyed-state
>
> pt., 21 paź 2022 o 14:37 Sriram Ganesh  napisał(a):
>
>> I have question on this. Different connector can have different
>> serialisation and de-serlisation technique right?. Wouldn't that impact?.
>> If I use StateProcessor API, would that be agnostic to all the sources and
>> sinks?.
>>
>> On Fri, Oct 21, 2022, 18:00 Piotr Nowojski  wrote:
>>
>>> ops
>>>
>>> > Alternatively, you can modify a code of your function/operator for
>>> which you want to modify the state. For example in the
>>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>>> method you could add some code that would do a migration of your old state
>>> to a new one.
>>> > And you can drop such code later, in the next savepoint.
>>>
>>> That was not entirely true. This would work for the non-keyed state. For
>>> the keyed state there is no easy alternative (you would have to iterate
>>> through all of the keys, which I think is not exposed via Public API) -
>>> best to use StateProcessor API.
>>>
>>> Best,
>>> Piotrek
>>>
>>> pt., 21 paź 2022 o 10:54 Sriram Ganesh  napisał(a):
>>>
 Thanks !. Will try this.

 On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski 
 wrote:

> Hi Sriram,
>
> You can read and modify savepoints using StateProcessor API [1].
>
> Alternatively, you can modify a code of your function/operator for
> which you want to modify the state. For example in the
> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
> method you could add some code that would do a migration of your old state
> to a new one.
>
> ```
> private transient ValueState old;
> private transient ValueState new;
> (...)
> initializeState(...) {
>   (...)
>   if (new.value() == null && old.value() != null) {
> // code to migrate from old to new one
> new.update(migrate(old.value());
> old.update(null);
>   }
> }
> ```
>
> And you can drop such code later, in the next savepoint.
>
> Best,
> Piotrek
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/
>
> pt., 21 paź 2022 o 10:05 Sriram Ganesh 
> napisał(a):
>
>> Hi All,
>>
>> I am working on a scenario where I need to modify the existing
>> savepoint operator state. Ex: Wanted to remove some offset of the
>> savepoint.
>>
>> What is the better practice for these scenarios?. Could you please
>> help me with any example as such?
>>
>> Thanks in advance.
>>
>> --
>> *Sriram G*
>> *Tech*
>>
>>

 --
 *Sriram G*
 *Tech*




Re: Modify savepoints in Flink

2022-10-21 Thread Piotr Nowojski
Hi,

Yes and no. StateProcessor API can read any Flink state, but you have to
describe the state you want it to access. Take a look at the example in the
docs [1].

First you have an example of a theoretical production function
`StatefulFunctionWithTime`, which state you want to modify. Note the
`ValueState` and `ListState` fields and their descriptors. That's the state
of that particular function. Descriptors determine how the state is
serialised. Usually they are pretty simple.
Below is the `ReaderFunction`, that you want to use to access/modify the
state via the StateProcessor API. To do so, you have to specify the state
you want to access and effectively mimic/copy paste the state descriptors
from the production code.

If you want to modify the state of a source/sink function, you would have
to first take a look into the source code of such a connector to know what
to modify and copy its descriptors. Also note that for source/sink the
state is most likely non-keyed.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/#keyed-state

pt., 21 paź 2022 o 14:37 Sriram Ganesh  napisał(a):

> I have question on this. Different connector can have different
> serialisation and de-serlisation technique right?. Wouldn't that impact?.
> If I use StateProcessor API, would that be agnostic to all the sources and
> sinks?.
>
> On Fri, Oct 21, 2022, 18:00 Piotr Nowojski  wrote:
>
>> ops
>>
>> > Alternatively, you can modify a code of your function/operator for
>> which you want to modify the state. For example in the
>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>> method you could add some code that would do a migration of your old state
>> to a new one.
>> > And you can drop such code later, in the next savepoint.
>>
>> That was not entirely true. This would work for the non-keyed state. For
>> the keyed state there is no easy alternative (you would have to iterate
>> through all of the keys, which I think is not exposed via Public API) -
>> best to use StateProcessor API.
>>
>> Best,
>> Piotrek
>>
>> pt., 21 paź 2022 o 10:54 Sriram Ganesh  napisał(a):
>>
>>> Thanks !. Will try this.
>>>
>>> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski 
>>> wrote:
>>>
 Hi Sriram,

 You can read and modify savepoints using StateProcessor API [1].

 Alternatively, you can modify a code of your function/operator for
 which you want to modify the state. For example in the
 `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
 method you could add some code that would do a migration of your old state
 to a new one.

 ```
 private transient ValueState old;
 private transient ValueState new;
 (...)
 initializeState(...) {
   (...)
   if (new.value() == null && old.value() != null) {
 // code to migrate from old to new one
 new.update(migrate(old.value());
 old.update(null);
   }
 }
 ```

 And you can drop such code later, in the next savepoint.

 Best,
 Piotrek

 [1]
 https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/

 pt., 21 paź 2022 o 10:05 Sriram Ganesh 
 napisał(a):

> Hi All,
>
> I am working on a scenario where I need to modify the existing
> savepoint operator state. Ex: Wanted to remove some offset of the
> savepoint.
>
> What is the better practice for these scenarios?. Could you please
> help me with any example as such?
>
> Thanks in advance.
>
> --
> *Sriram G*
> *Tech*
>
>
>>>
>>> --
>>> *Sriram G*
>>> *Tech*
>>>
>>>


Re: Modify savepoints in Flink

2022-10-21 Thread Sriram Ganesh
I have question on this. Different connector can have different
serialisation and de-serlisation technique right?. Wouldn't that impact?.
If I use StateProcessor API, would that be agnostic to all the sources and
sinks?.

On Fri, Oct 21, 2022, 18:00 Piotr Nowojski  wrote:

> ops
>
> > Alternatively, you can modify a code of your function/operator for which
> you want to modify the state. For example in the
> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
> method you could add some code that would do a migration of your old state
> to a new one.
> > And you can drop such code later, in the next savepoint.
>
> That was not entirely true. This would work for the non-keyed state. For
> the keyed state there is no easy alternative (you would have to iterate
> through all of the keys, which I think is not exposed via Public API) -
> best to use StateProcessor API.
>
> Best,
> Piotrek
>
> pt., 21 paź 2022 o 10:54 Sriram Ganesh  napisał(a):
>
>> Thanks !. Will try this.
>>
>> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi Sriram,
>>>
>>> You can read and modify savepoints using StateProcessor API [1].
>>>
>>> Alternatively, you can modify a code of your function/operator for which
>>> you want to modify the state. For example in the
>>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>>> method you could add some code that would do a migration of your old state
>>> to a new one.
>>>
>>> ```
>>> private transient ValueState old;
>>> private transient ValueState new;
>>> (...)
>>> initializeState(...) {
>>>   (...)
>>>   if (new.value() == null && old.value() != null) {
>>> // code to migrate from old to new one
>>> new.update(migrate(old.value());
>>> old.update(null);
>>>   }
>>> }
>>> ```
>>>
>>> And you can drop such code later, in the next savepoint.
>>>
>>> Best,
>>> Piotrek
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/
>>>
>>> pt., 21 paź 2022 o 10:05 Sriram Ganesh  napisał(a):
>>>
 Hi All,

 I am working on a scenario where I need to modify the existing
 savepoint operator state. Ex: Wanted to remove some offset of the
 savepoint.

 What is the better practice for these scenarios?. Could you please help
 me with any example as such?

 Thanks in advance.

 --
 *Sriram G*
 *Tech*


>>
>> --
>> *Sriram G*
>> *Tech*
>>
>>


Re: Modify savepoints in Flink

2022-10-21 Thread Piotr Nowojski
ops

> Alternatively, you can modify a code of your function/operator for which
you want to modify the state. For example in the
`org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
method you could add some code that would do a migration of your old state
to a new one.
> And you can drop such code later, in the next savepoint.

That was not entirely true. This would work for the non-keyed state. For
the keyed state there is no easy alternative (you would have to iterate
through all of the keys, which I think is not exposed via Public API) -
best to use StateProcessor API.

Best,
Piotrek

pt., 21 paź 2022 o 10:54 Sriram Ganesh  napisał(a):

> Thanks !. Will try this.
>
> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski 
> wrote:
>
>> Hi Sriram,
>>
>> You can read and modify savepoints using StateProcessor API [1].
>>
>> Alternatively, you can modify a code of your function/operator for which
>> you want to modify the state. For example in the
>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>> method you could add some code that would do a migration of your old state
>> to a new one.
>>
>> ```
>> private transient ValueState old;
>> private transient ValueState new;
>> (...)
>> initializeState(...) {
>>   (...)
>>   if (new.value() == null && old.value() != null) {
>> // code to migrate from old to new one
>> new.update(migrate(old.value());
>> old.update(null);
>>   }
>> }
>> ```
>>
>> And you can drop such code later, in the next savepoint.
>>
>> Best,
>> Piotrek
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/
>>
>> pt., 21 paź 2022 o 10:05 Sriram Ganesh  napisał(a):
>>
>>> Hi All,
>>>
>>> I am working on a scenario where I need to modify the existing savepoint
>>> operator state. Ex: Wanted to remove some offset of the savepoint.
>>>
>>> What is the better practice for these scenarios?. Could you please help
>>> me with any example as such?
>>>
>>> Thanks in advance.
>>>
>>> --
>>> *Sriram G*
>>> *Tech*
>>>
>>>
>
> --
> *Sriram G*
> *Tech*
>
>


Re: Modify savepoints in Flink

2022-10-21 Thread Sriram Ganesh
Thanks !. Will try this.

On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski  wrote:

> Hi Sriram,
>
> You can read and modify savepoints using StateProcessor API [1].
>
> Alternatively, you can modify a code of your function/operator for which
> you want to modify the state. For example in the
> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
> method you could add some code that would do a migration of your old state
> to a new one.
>
> ```
> private transient ValueState old;
> private transient ValueState new;
> (...)
> initializeState(...) {
>   (...)
>   if (new.value() == null && old.value() != null) {
> // code to migrate from old to new one
> new.update(migrate(old.value());
> old.update(null);
>   }
> }
> ```
>
> And you can drop such code later, in the next savepoint.
>
> Best,
> Piotrek
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/
>
> pt., 21 paź 2022 o 10:05 Sriram Ganesh  napisał(a):
>
>> Hi All,
>>
>> I am working on a scenario where I need to modify the existing savepoint
>> operator state. Ex: Wanted to remove some offset of the savepoint.
>>
>> What is the better practice for these scenarios?. Could you please help
>> me with any example as such?
>>
>> Thanks in advance.
>>
>> --
>> *Sriram G*
>> *Tech*
>>
>>

-- 
*Sriram G*
*Tech*


Re: Modify savepoints in Flink

2022-10-21 Thread Piotr Nowojski
Hi Sriram,

You can read and modify savepoints using StateProcessor API [1].

Alternatively, you can modify a code of your function/operator for which
you want to modify the state. For example in the
`org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
method you could add some code that would do a migration of your old state
to a new one.

```
private transient ValueState old;
private transient ValueState new;
(...)
initializeState(...) {
  (...)
  if (new.value() == null && old.value() != null) {
// code to migrate from old to new one
new.update(migrate(old.value());
old.update(null);
  }
}
```

And you can drop such code later, in the next savepoint.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/

pt., 21 paź 2022 o 10:05 Sriram Ganesh  napisał(a):

> Hi All,
>
> I am working on a scenario where I need to modify the existing savepoint
> operator state. Ex: Wanted to remove some offset of the savepoint.
>
> What is the better practice for these scenarios?. Could you please help me
> with any example as such?
>
> Thanks in advance.
>
> --
> *Sriram G*
> *Tech*
>
>


Modify savepoints in Flink

2022-10-21 Thread Sriram Ganesh
Hi All,

I am working on a scenario where I need to modify the existing savepoint
operator state. Ex: Wanted to remove some offset of the savepoint.

What is the better practice for these scenarios?. Could you please help me
with any example as such?

Thanks in advance.

-- 
*Sriram G*
*Tech*