Re: Modify savepoints in Flink
Thanks, I'll check it out. On Fri, Oct 21, 2022, 18:20 Piotr Nowojski wrote: > Hi, > > Yes and no. StateProcessor API can read any Flink state, but you have to > describe the state you want it to access. Take a look at the example in the > docs [1]. > > First you have an example of a theoretical production function > `StatefulFunctionWithTime`, which state you want to modify. Note the > `ValueState` and `ListState` fields and their descriptors. That's the state > of that particular function. Descriptors determine how the state is > serialised. Usually they are pretty simple. > Below is the `ReaderFunction`, that you want to use to access/modify the > state via the StateProcessor API. To do so, you have to specify the state > you want to access and effectively mimic/copy paste the state descriptors > from the production code. > > If you want to modify the state of a source/sink function, you would have > to first take a look into the source code of such a connector to know what > to modify and copy its descriptors. Also note that for source/sink the > state is most likely non-keyed. > > Best, > Piotrek > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/#keyed-state > > pt., 21 paź 2022 o 14:37 Sriram Ganesh napisał(a): > >> I have question on this. Different connector can have different >> serialisation and de-serlisation technique right?. Wouldn't that impact?. >> If I use StateProcessor API, would that be agnostic to all the sources and >> sinks?. >> >> On Fri, Oct 21, 2022, 18:00 Piotr Nowojski wrote: >> >>> ops >>> >>> > Alternatively, you can modify a code of your function/operator for >>> which you want to modify the state. For example in the >>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState` >>> method you could add some code that would do a migration of your old state >>> to a new one. >>> > And you can drop such code later, in the next savepoint. >>> >>> That was not entirely true. This would work for the non-keyed state. For >>> the keyed state there is no easy alternative (you would have to iterate >>> through all of the keys, which I think is not exposed via Public API) - >>> best to use StateProcessor API. >>> >>> Best, >>> Piotrek >>> >>> pt., 21 paź 2022 o 10:54 Sriram Ganesh napisał(a): >>> Thanks !. Will try this. On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski wrote: > Hi Sriram, > > You can read and modify savepoints using StateProcessor API [1]. > > Alternatively, you can modify a code of your function/operator for > which you want to modify the state. For example in the > `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState` > method you could add some code that would do a migration of your old state > to a new one. > > ``` > private transient ValueState old; > private transient ValueState new; > (...) > initializeState(...) { > (...) > if (new.value() == null && old.value() != null) { > // code to migrate from old to new one > new.update(migrate(old.value()); > old.update(null); > } > } > ``` > > And you can drop such code later, in the next savepoint. > > Best, > Piotrek > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/ > > pt., 21 paź 2022 o 10:05 Sriram Ganesh > napisał(a): > >> Hi All, >> >> I am working on a scenario where I need to modify the existing >> savepoint operator state. Ex: Wanted to remove some offset of the >> savepoint. >> >> What is the better practice for these scenarios?. Could you please >> help me with any example as such? >> >> Thanks in advance. >> >> -- >> *Sriram G* >> *Tech* >> >> -- *Sriram G* *Tech*
Re: Modify savepoints in Flink
Hi, Yes and no. StateProcessor API can read any Flink state, but you have to describe the state you want it to access. Take a look at the example in the docs [1]. First you have an example of a theoretical production function `StatefulFunctionWithTime`, which state you want to modify. Note the `ValueState` and `ListState` fields and their descriptors. That's the state of that particular function. Descriptors determine how the state is serialised. Usually they are pretty simple. Below is the `ReaderFunction`, that you want to use to access/modify the state via the StateProcessor API. To do so, you have to specify the state you want to access and effectively mimic/copy paste the state descriptors from the production code. If you want to modify the state of a source/sink function, you would have to first take a look into the source code of such a connector to know what to modify and copy its descriptors. Also note that for source/sink the state is most likely non-keyed. Best, Piotrek [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/#keyed-state pt., 21 paź 2022 o 14:37 Sriram Ganesh napisał(a): > I have question on this. Different connector can have different > serialisation and de-serlisation technique right?. Wouldn't that impact?. > If I use StateProcessor API, would that be agnostic to all the sources and > sinks?. > > On Fri, Oct 21, 2022, 18:00 Piotr Nowojski wrote: > >> ops >> >> > Alternatively, you can modify a code of your function/operator for >> which you want to modify the state. For example in the >> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState` >> method you could add some code that would do a migration of your old state >> to a new one. >> > And you can drop such code later, in the next savepoint. >> >> That was not entirely true. This would work for the non-keyed state. For >> the keyed state there is no easy alternative (you would have to iterate >> through all of the keys, which I think is not exposed via Public API) - >> best to use StateProcessor API. >> >> Best, >> Piotrek >> >> pt., 21 paź 2022 o 10:54 Sriram Ganesh napisał(a): >> >>> Thanks !. Will try this. >>> >>> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski >>> wrote: >>> Hi Sriram, You can read and modify savepoints using StateProcessor API [1]. Alternatively, you can modify a code of your function/operator for which you want to modify the state. For example in the `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState` method you could add some code that would do a migration of your old state to a new one. ``` private transient ValueState old; private transient ValueState new; (...) initializeState(...) { (...) if (new.value() == null && old.value() != null) { // code to migrate from old to new one new.update(migrate(old.value()); old.update(null); } } ``` And you can drop such code later, in the next savepoint. Best, Piotrek [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/ pt., 21 paź 2022 o 10:05 Sriram Ganesh napisał(a): > Hi All, > > I am working on a scenario where I need to modify the existing > savepoint operator state. Ex: Wanted to remove some offset of the > savepoint. > > What is the better practice for these scenarios?. Could you please > help me with any example as such? > > Thanks in advance. > > -- > *Sriram G* > *Tech* > > >>> >>> -- >>> *Sriram G* >>> *Tech* >>> >>>
Re: Modify savepoints in Flink
I have question on this. Different connector can have different serialisation and de-serlisation technique right?. Wouldn't that impact?. If I use StateProcessor API, would that be agnostic to all the sources and sinks?. On Fri, Oct 21, 2022, 18:00 Piotr Nowojski wrote: > ops > > > Alternatively, you can modify a code of your function/operator for which > you want to modify the state. For example in the > `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState` > method you could add some code that would do a migration of your old state > to a new one. > > And you can drop such code later, in the next savepoint. > > That was not entirely true. This would work for the non-keyed state. For > the keyed state there is no easy alternative (you would have to iterate > through all of the keys, which I think is not exposed via Public API) - > best to use StateProcessor API. > > Best, > Piotrek > > pt., 21 paź 2022 o 10:54 Sriram Ganesh napisał(a): > >> Thanks !. Will try this. >> >> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski >> wrote: >> >>> Hi Sriram, >>> >>> You can read and modify savepoints using StateProcessor API [1]. >>> >>> Alternatively, you can modify a code of your function/operator for which >>> you want to modify the state. For example in the >>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState` >>> method you could add some code that would do a migration of your old state >>> to a new one. >>> >>> ``` >>> private transient ValueState old; >>> private transient ValueState new; >>> (...) >>> initializeState(...) { >>> (...) >>> if (new.value() == null && old.value() != null) { >>> // code to migrate from old to new one >>> new.update(migrate(old.value()); >>> old.update(null); >>> } >>> } >>> ``` >>> >>> And you can drop such code later, in the next savepoint. >>> >>> Best, >>> Piotrek >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/ >>> >>> pt., 21 paź 2022 o 10:05 Sriram Ganesh napisał(a): >>> Hi All, I am working on a scenario where I need to modify the existing savepoint operator state. Ex: Wanted to remove some offset of the savepoint. What is the better practice for these scenarios?. Could you please help me with any example as such? Thanks in advance. -- *Sriram G* *Tech* >> >> -- >> *Sriram G* >> *Tech* >> >>
Re: Modify savepoints in Flink
ops > Alternatively, you can modify a code of your function/operator for which you want to modify the state. For example in the `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState` method you could add some code that would do a migration of your old state to a new one. > And you can drop such code later, in the next savepoint. That was not entirely true. This would work for the non-keyed state. For the keyed state there is no easy alternative (you would have to iterate through all of the keys, which I think is not exposed via Public API) - best to use StateProcessor API. Best, Piotrek pt., 21 paź 2022 o 10:54 Sriram Ganesh napisał(a): > Thanks !. Will try this. > > On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski > wrote: > >> Hi Sriram, >> >> You can read and modify savepoints using StateProcessor API [1]. >> >> Alternatively, you can modify a code of your function/operator for which >> you want to modify the state. For example in the >> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState` >> method you could add some code that would do a migration of your old state >> to a new one. >> >> ``` >> private transient ValueState old; >> private transient ValueState new; >> (...) >> initializeState(...) { >> (...) >> if (new.value() == null && old.value() != null) { >> // code to migrate from old to new one >> new.update(migrate(old.value()); >> old.update(null); >> } >> } >> ``` >> >> And you can drop such code later, in the next savepoint. >> >> Best, >> Piotrek >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/ >> >> pt., 21 paź 2022 o 10:05 Sriram Ganesh napisał(a): >> >>> Hi All, >>> >>> I am working on a scenario where I need to modify the existing savepoint >>> operator state. Ex: Wanted to remove some offset of the savepoint. >>> >>> What is the better practice for these scenarios?. Could you please help >>> me with any example as such? >>> >>> Thanks in advance. >>> >>> -- >>> *Sriram G* >>> *Tech* >>> >>> > > -- > *Sriram G* > *Tech* > >
Re: Modify savepoints in Flink
Thanks !. Will try this. On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski wrote: > Hi Sriram, > > You can read and modify savepoints using StateProcessor API [1]. > > Alternatively, you can modify a code of your function/operator for which > you want to modify the state. For example in the > `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState` > method you could add some code that would do a migration of your old state > to a new one. > > ``` > private transient ValueState old; > private transient ValueState new; > (...) > initializeState(...) { > (...) > if (new.value() == null && old.value() != null) { > // code to migrate from old to new one > new.update(migrate(old.value()); > old.update(null); > } > } > ``` > > And you can drop such code later, in the next savepoint. > > Best, > Piotrek > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/ > > pt., 21 paź 2022 o 10:05 Sriram Ganesh napisał(a): > >> Hi All, >> >> I am working on a scenario where I need to modify the existing savepoint >> operator state. Ex: Wanted to remove some offset of the savepoint. >> >> What is the better practice for these scenarios?. Could you please help >> me with any example as such? >> >> Thanks in advance. >> >> -- >> *Sriram G* >> *Tech* >> >> -- *Sriram G* *Tech*
Re: Modify savepoints in Flink
Hi Sriram, You can read and modify savepoints using StateProcessor API [1]. Alternatively, you can modify a code of your function/operator for which you want to modify the state. For example in the `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState` method you could add some code that would do a migration of your old state to a new one. ``` private transient ValueState old; private transient ValueState new; (...) initializeState(...) { (...) if (new.value() == null && old.value() != null) { // code to migrate from old to new one new.update(migrate(old.value()); old.update(null); } } ``` And you can drop such code later, in the next savepoint. Best, Piotrek [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/ pt., 21 paź 2022 o 10:05 Sriram Ganesh napisał(a): > Hi All, > > I am working on a scenario where I need to modify the existing savepoint > operator state. Ex: Wanted to remove some offset of the savepoint. > > What is the better practice for these scenarios?. Could you please help me > with any example as such? > > Thanks in advance. > > -- > *Sriram G* > *Tech* > >
Modify savepoints in Flink
Hi All, I am working on a scenario where I need to modify the existing savepoint operator state. Ex: Wanted to remove some offset of the savepoint. What is the better practice for these scenarios?. Could you please help me with any example as such? Thanks in advance. -- *Sriram G* *Tech*