Hi,

Yes and no. StateProcessor API can read any Flink state, but you have to
describe the state you want it to access. Take a look at the example in the
docs [1].

First you have an example of a theoretical production function
`StatefulFunctionWithTime`, which state you want to modify. Note the
`ValueState` and `ListState` fields and their descriptors. That's the state
of that particular function. Descriptors determine how the state is
serialised. Usually they are pretty simple.
Below is the `ReaderFunction`, that you want to use to access/modify the
state via the StateProcessor API. To do so, you have to specify the state
you want to access and effectively mimic/copy paste the state descriptors
from the production code.

If you want to modify the state of a source/sink function, you would have
to first take a look into the source code of such a connector to know what
to modify and copy its descriptors. Also note that for source/sink the
state is most likely non-keyed.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/#keyed-state

pt., 21 paź 2022 o 14:37 Sriram Ganesh <srigns...@gmail.com> napisał(a):

> I have question on this. Different connector can have different
> serialisation and de-serlisation technique right?. Wouldn't that impact?.
> If I use StateProcessor API, would that be agnostic to all the sources and
> sinks?.
>
> On Fri, Oct 21, 2022, 18:00 Piotr Nowojski <pnowoj...@apache.org> wrote:
>
>> ops
>>
>> > Alternatively, you can modify a code of your function/operator for
>> which you want to modify the state. For example in the
>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>> method you could add some code that would do a migration of your old state
>> to a new one.
>> > And you can drop such code later, in the next savepoint.
>>
>> That was not entirely true. This would work for the non-keyed state. For
>> the keyed state there is no easy alternative (you would have to iterate
>> through all of the keys, which I think is not exposed via Public API) -
>> best to use StateProcessor API.
>>
>> Best,
>> Piotrek
>>
>> pt., 21 paź 2022 o 10:54 Sriram Ganesh <srigns...@gmail.com> napisał(a):
>>
>>> Thanks !. Will try this.
>>>
>>> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski <pnowoj...@apache.org>
>>> wrote:
>>>
>>>> Hi Sriram,
>>>>
>>>> You can read and modify savepoints using StateProcessor API [1].
>>>>
>>>> Alternatively, you can modify a code of your function/operator for
>>>> which you want to modify the state. For example in the
>>>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>>>> method you could add some code that would do a migration of your old state
>>>> to a new one.
>>>>
>>>> ```
>>>> private transient ValueState<Foo> old;
>>>> private transient ValueState<Foo> new;
>>>> (...)
>>>> initializeState(...) {
>>>>   (...)
>>>>   if (new.value() == null && old.value() != null) {
>>>>     // code to migrate from old to new one
>>>>     new.update(migrate(old.value());
>>>>     old.update(null);
>>>>   }
>>>> }
>>>> ```
>>>>
>>>> And you can drop such code later, in the next savepoint.
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/
>>>>
>>>> pt., 21 paź 2022 o 10:05 Sriram Ganesh <srigns...@gmail.com>
>>>> napisał(a):
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am working on a scenario where I need to modify the existing
>>>>> savepoint operator state. Ex: Wanted to remove some offset of the
>>>>> savepoint.
>>>>>
>>>>> What is the better practice for these scenarios?. Could you please
>>>>> help me with any example as such?
>>>>>
>>>>> Thanks in advance.
>>>>>
>>>>> --
>>>>> *Sriram G*
>>>>> *Tech*
>>>>>
>>>>>
>>>
>>> --
>>> *Sriram G*
>>> *Tech*
>>>
>>>

Reply via email to