I believe this should work, with a couple of caveats:

- You can't do this with unaligned checkpoints
- If you have dropped some state, you must specify --allowNonRestoredState
when you restart the job

David

On Wed, Jul 22, 2020 at 4:06 PM Sivaprasanna <sivaprasanna...@gmail.com>
wrote:

> Hi,
>
> We are trying out state schema migration for one of our stateful
> pipelines. We use few Avro type states. Changes made to the job:
>     1. Updated the schema for one of the states (added a new 'boolean'
> field with default value).
>     2. Modified the code by removing a couple of ValueStates.
>
> To push these changes, I stopped the live job and resubmitted the new jar
> with the latest *checkpoint* path. However, the job failed with the
> following error:
>
> java.lang.RuntimeException: Error while getting state
>     at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>     at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
>     ...
>     ...
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> serializer cannot be incompatible.
>     at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
>
>     at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
>
>     at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
>
> I was going through the state schema evolution doc. The document mentions
> that we need to take a *savepoint* and restart the job with the savepoint
> path. We are using RocksDB backend with incremental checkpoint enabled. Can
> we not use the latest checkpoint available when we are dealing with state
> schema changes?
>
> Complete stacktrace is attached with this mail.
>
> -
> Sivaprasanna
>

Reply via email to