Hi, to me, this looks like you are running into the problem described under [FLINK-4603] : KeyedStateBackend cannot restore user code classes. I have opened a pull request (PR 2533) this morning that should fix this behavior as soon as it is merged into master.
Best, Stefan > Am 21.09.2016 um 23:49 schrieb Josh <jof...@gmail.com>: > > Hi Stephan, > > Thanks for the reply. I should have been a bit clearer but actually I was not > trying to restore a 1.0 savepoint with 1.2. I ran my job on 1.2 from scratch > (starting with no state), then took a savepoint and tried to restart it from > the savepoint - and that's when I get this exception. If I do this with the > same job using an older version of Flink (1.1-SNAPSHOT taken in June), the > savepoint and restore works fine. > > I wanted to use 1.2-SNAPSHOT because it has some changes I'd like to use > (improvements to Kinesis connector + the bucketing sink). Anyway for now I > have things working with an older version of Flink - but it would be good to > know what's changed recently that's causing the restore to break and if my > job is not going to be compatible with future releases of Flink. > > Best, > Josh > > On Wed, Sep 21, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org > <mailto:se...@apache.org>> wrote: > Hi Josh! > > The state code on 1.2-SNAPSHOT is undergoing pretty heavy changes right now, > in order to add the elasticity feature (change parallelism or running jobs > and still maintaining exactly once guarantees). > At this point, no 1.0 savepoints can be restored in 1.2-SNAPSHOT. We will try > and add compatibility towards 1.1 savepoints before the release of version > 1.2. > > I think the exception is probably caused by the fact that old savepoint > stored some serialized user code (the new one is not expected to) which > cannot be loaded. > > Adding Aljoscha and Stefan to this, to see if they can add anything. > In any case, this should have a much better error message. > > I hope you understand that the 1.2-SNAPSHOT version is in some parts WIP, so > not really recommended for general use. > > Does version 1.1 not work for you? > > Greetings, > Stephan > > > On Wed, Sep 21, 2016 at 7:44 PM, Josh <jof...@gmail.com > <mailto:jof...@gmail.com>> wrote: > Hi, > > I have a Flink job which uses the RocksDBStateBackend, which has been running > on a Flink 1.0 cluster. > > The job is written in Scala, and I previously made some changes to the job to > ensure that state could be restored. For example, whenever I call `map` or > `flatMap` on a DataStream, I pass a named class: `.flatMap(new > MyCustomFlatMapper())` instead of an anonymous function. > > I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able to > restore state. I'm seeing exceptions which look like this when trying to > restore from a savepoint: > > java.lang.RuntimeException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.open(AbstractStreamOperator.java:148) > Caused by: java.lang.ClassNotFoundException: > com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2 > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:653) > > I'm not passing any anonymous functions to `map` or `flatMap` on Flink > DataStreams, so it looks like this exception is caused just from using Scala > functions like `filter`, `map`, `flatMap` on standard Scala collections, > within my class `MyCustomFlatMapper`. > > Are there any changes to the way Flink state is restored or to > RocksDBStateBackend, in the last 2-3 months, which could cause this to happen? > If so, any advice on fixing it? > > I'm hoping there's a better solution to this than rewriting the Flink job in > Java. > > Thanks, > > Josh > > >