I believe this should work, with a couple of caveats: - You can't do this with unaligned checkpoints - If you have dropped some state, you must specify --allowNonRestoredState when you restart the job
David On Wed, Jul 22, 2020 at 4:06 PM Sivaprasanna <sivaprasanna...@gmail.com> wrote: > Hi, > > We are trying out state schema migration for one of our stateful > pipelines. We use few Avro type states. Changes made to the job: > 1. Updated the schema for one of the states (added a new 'boolean' > field with default value). > 2. Modified the code by removing a couple of ValueStates. > > To push these changes, I stopped the live job and resubmitted the new jar > with the latest *checkpoint* path. However, the job failed with the > following error: > > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) > ... > ... > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer cannot be incompatible. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543) > > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491) > > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652) > > I was going through the state schema evolution doc. The document mentions > that we need to take a *savepoint* and restart the job with the savepoint > path. We are using RocksDB backend with incremental checkpoint enabled. Can > we not use the latest checkpoint available when we are dealing with state > schema changes? > > Complete stacktrace is attached with this mail. > > - > Sivaprasanna >