Hi Piotrek, I used `ProcessFunction` to implement it, but it seems that I can't call `getKeyedStateBackend()` like `WindowOperator` did. I found that `getKeyedStateBackend()` is the method in `AbstractStreamOperator` and `ProcessFunction` API didn't extend it. Dose that mean I can't look up all keys and migrate the entire previous states to the new states in `ProcessFunction#open()`? As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to migration state like the manner showed in `WindowOperator`?
Best Regards, Tony Wei 2018-06-07 20:28 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>: > What function are you implementing and how are you using it? > > Usually it’s enough if your function implements RichFunction (or rather > extend from AbstractRichFunction) and then you could use RichFunction#open > in the similar manner as in the code that I posted in previous message. > Flink in many places performs instanceof chekcs like: org.apache.flink.api. > common.functions.util.FunctionUtils#openFunction > > public static void openFunction(Function function, Configuration > parameters) throws Exception{ > if (function instanceof RichFunction) { > RichFunction richFunction = (RichFunction) function; > richFunction.open(parameters); > } > } > > Piotrek > > > On 7 Jun 2018, at 11:07, Tony Wei <tony19920...@gmail.com> wrote: > > Hi Piotrek, > > It seems that this was implemented by `Operator` API, which is a more low > level api compared to `Function` API. > Since in `Function` API level we can only migrate state by event > triggered, it is more convenient in this way to migrate state by foreach > all keys in `open()` method. > If I was implemented state operator by `ProcessFunction` API, is it > possible to port it to `KeyedProcessOperator` and do the state migration > that you mentioned? > And are there something concerned and difficulties that will leads to > restored state failed or other problems? Thank you! > > Best Regards, > Tony Wei > > 2018-06-07 16:10 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>: > >> Hi, >> >> General solution for state/schema migration is under development and it >> might be released with Flink 1.6.0. >> >> Before that, you need to manually handle the state migration in your >> operator’s open method. Lets assume that your OperatorV1 has a state field >> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible >> with previous version. What you can do, is to add a logic in open method, >> to check: >> 1. If “stateV2” is non empty, do nothing >> 2. If there is no “stateV2”, iterate over all of the keys and manually >> migrate “stateV1” to “stateV2” >> >> In your OperatorV3 you could drop the support for “stateV1”. >> >> I have once implemented something like that here: >> >> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7 >> acd03cb3f95c000926b2/flink-streaming-java/src/main/java/ >> org/apache/flink/streaming/runtime/operators/windowing/Wi >> ndowOperator.java#L258 >> >> Hope that helps! >> >> Piotrek >> >> >> On 6 Jun 2018, at 17:04, TechnoMage <mla...@technomage.com> wrote: >> >> We are still pretty new to Flink and I have a conceptual / DevOps >> question. >> >> When a job is modified and we want to deploy the new version, what is the >> preferred method? Our jobs have a lot of keyed state. >> >> If we use snapshots we have old state that may no longer apply to the new >> pipeline. >> If we start a new job we can reprocess historical data from Kafka, but >> that can be very resource heavy for a while. >> >> Is there an option I am missing? Are there facilities to “patch” or >> “purge” selectively the keyed state? >> >> Michael >> >> >> > >