Hi, Yes and no. StateProcessor API can read any Flink state, but you have to describe the state you want it to access. Take a look at the example in the docs [1].
First you have an example of a theoretical production function `StatefulFunctionWithTime`, which state you want to modify. Note the `ValueState` and `ListState` fields and their descriptors. That's the state of that particular function. Descriptors determine how the state is serialised. Usually they are pretty simple. Below is the `ReaderFunction`, that you want to use to access/modify the state via the StateProcessor API. To do so, you have to specify the state you want to access and effectively mimic/copy paste the state descriptors from the production code. If you want to modify the state of a source/sink function, you would have to first take a look into the source code of such a connector to know what to modify and copy its descriptors. Also note that for source/sink the state is most likely non-keyed. Best, Piotrek [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/#keyed-state pt., 21 paź 2022 o 14:37 Sriram Ganesh <srigns...@gmail.com> napisał(a): > I have question on this. Different connector can have different > serialisation and de-serlisation technique right?. Wouldn't that impact?. > If I use StateProcessor API, would that be agnostic to all the sources and > sinks?. > > On Fri, Oct 21, 2022, 18:00 Piotr Nowojski <pnowoj...@apache.org> wrote: > >> ops >> >> > Alternatively, you can modify a code of your function/operator for >> which you want to modify the state. For example in the >> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState` >> method you could add some code that would do a migration of your old state >> to a new one. >> > And you can drop such code later, in the next savepoint. >> >> That was not entirely true. This would work for the non-keyed state. For >> the keyed state there is no easy alternative (you would have to iterate >> through all of the keys, which I think is not exposed via Public API) - >> best to use StateProcessor API. >> >> Best, >> Piotrek >> >> pt., 21 paź 2022 o 10:54 Sriram Ganesh <srigns...@gmail.com> napisał(a): >> >>> Thanks !. Will try this. >>> >>> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski <pnowoj...@apache.org> >>> wrote: >>> >>>> Hi Sriram, >>>> >>>> You can read and modify savepoints using StateProcessor API [1]. >>>> >>>> Alternatively, you can modify a code of your function/operator for >>>> which you want to modify the state. For example in the >>>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState` >>>> method you could add some code that would do a migration of your old state >>>> to a new one. >>>> >>>> ``` >>>> private transient ValueState<Foo> old; >>>> private transient ValueState<Foo> new; >>>> (...) >>>> initializeState(...) { >>>> (...) >>>> if (new.value() == null && old.value() != null) { >>>> // code to migrate from old to new one >>>> new.update(migrate(old.value()); >>>> old.update(null); >>>> } >>>> } >>>> ``` >>>> >>>> And you can drop such code later, in the next savepoint. >>>> >>>> Best, >>>> Piotrek >>>> >>>> [1] >>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/ >>>> >>>> pt., 21 paź 2022 o 10:05 Sriram Ganesh <srigns...@gmail.com> >>>> napisał(a): >>>> >>>>> Hi All, >>>>> >>>>> I am working on a scenario where I need to modify the existing >>>>> savepoint operator state. Ex: Wanted to remove some offset of the >>>>> savepoint. >>>>> >>>>> What is the better practice for these scenarios?. Could you please >>>>> help me with any example as such? >>>>> >>>>> Thanks in advance. >>>>> >>>>> -- >>>>> *Sriram G* >>>>> *Tech* >>>>> >>>>> >>> >>> -- >>> *Sriram G* >>> *Tech* >>> >>>