Hi Piotrek,

I used `ProcessFunction` to implement it, but it seems that I can't call
`getKeyedStateBackend()` like `WindowOperator` did.
I found that `getKeyedStateBackend()` is the method in
`AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
Dose that mean I can't look up all keys and migrate the entire previous
states to the new states in `ProcessFunction#open()`?
As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to
migration state like the manner showed in `WindowOperator`?

Best Regards,
Tony Wei

2018-06-07 20:28 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>:

> What function are you implementing and how are you using it?
>
> Usually it’s enough if your function implements RichFunction (or rather
> extend from AbstractRichFunction) and then you could use RichFunction#open
> in the similar manner as in the code that I posted in previous message.
> Flink in many places performs instanceof chekcs like: org.apache.flink.api.
> common.functions.util.FunctionUtils#openFunction
>
> public static void openFunction(Function function, Configuration
> parameters) throws Exception{
>    if (function instanceof RichFunction) {
>       RichFunction richFunction = (RichFunction) function;
>       richFunction.open(parameters);
>    }
> }
>
> Piotrek
>
>
> On 7 Jun 2018, at 11:07, Tony Wei <tony19920...@gmail.com> wrote:
>
> Hi Piotrek,
>
> It seems that this was implemented by `Operator` API, which is a more low
> level api compared to `Function` API.
> Since in `Function` API level we can only migrate state by event
> triggered, it is more convenient in this way to migrate state by foreach
> all keys in `open()` method.
> If I was implemented state operator by `ProcessFunction` API, is it
> possible to port it to `KeyedProcessOperator` and do the state migration
> that you mentioned?
> And are there something concerned and difficulties that will leads to
> restored state failed or other problems? Thank you!
>
> Best Regards,
> Tony Wei
>
> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>:
>
>> Hi,
>>
>> General solution for state/schema migration is under development and it
>> might be released with Flink 1.6.0.
>>
>> Before that, you need to manually handle the state migration in your
>> operator’s open method. Lets assume that your OperatorV1 has a state field
>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible
>> with previous version. What you can do, is to add a logic in open method,
>> to check:
>> 1. If “stateV2” is non empty, do nothing
>> 2. If there is no “stateV2”, iterate over all of the keys and manually
>> migrate “stateV1” to “stateV2”
>>
>> In your OperatorV3 you could drop the support for “stateV1”.
>>
>> I have once implemented something like that here:
>>
>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7
>> acd03cb3f95c000926b2/flink-streaming-java/src/main/java/
>> org/apache/flink/streaming/runtime/operators/windowing/Wi
>> ndowOperator.java#L258
>>
>> Hope that helps!
>>
>> Piotrek
>>
>>
>> On 6 Jun 2018, at 17:04, TechnoMage <mla...@technomage.com> wrote:
>>
>> We are still pretty new to Flink and I have a conceptual / DevOps
>> question.
>>
>> When a job is modified and we want to deploy the new version, what is the
>> preferred method?  Our jobs have a lot of keyed state.
>>
>> If we use snapshots we have old state that may no longer apply to the new
>> pipeline.
>> If we start a new job we can reprocess historical data from Kafka, but
>> that can be very resource heavy for a while.
>>
>> Is there an option I am missing?  Are there facilities to “patch” or
>> “purge” selectively the keyed state?
>>
>> Michael
>>
>>
>>
>
>

Reply via email to