Hi,

to me, this looks like you are running into the problem described under 
[FLINK-4603] : KeyedStateBackend cannot restore user code classes. I have 
opened a pull request (PR 2533) this morning that should fix this behavior as 
soon as it is merged into master.

Best,
Stefan

> Am 21.09.2016 um 23:49 schrieb Josh <jof...@gmail.com>:
> 
> Hi Stephan,
> 
> Thanks for the reply. I should have been a bit clearer but actually I was not 
> trying to restore a 1.0 savepoint with 1.2. I ran my job on 1.2 from scratch 
> (starting with no state), then took a savepoint and tried to restart it from 
> the savepoint - and that's when I get this exception. If I do this with the 
> same job using an older version of Flink (1.1-SNAPSHOT taken in June), the 
> savepoint and restore works fine.
> 
> I wanted to use 1.2-SNAPSHOT because it has some changes I'd like to use 
> (improvements to Kinesis connector + the bucketing sink). Anyway for now I 
> have things working with an older version of Flink - but it would be good to 
> know what's changed recently that's causing the restore to break and if my 
> job is not going to be compatible with future releases of Flink.
> 
> Best,
> Josh
> 
> On Wed, Sep 21, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org 
> <mailto:se...@apache.org>> wrote:
> Hi Josh!
> 
> The state code on 1.2-SNAPSHOT is undergoing pretty heavy changes right now, 
> in order to add the elasticity feature (change parallelism or running jobs 
> and still maintaining exactly once guarantees).
> At this point, no 1.0 savepoints can be restored in 1.2-SNAPSHOT. We will try 
> and add compatibility towards 1.1 savepoints before the release of version 
> 1.2.
> 
> I think the exception is probably caused by the fact that old savepoint 
> stored some serialized user code (the new one is not expected to) which 
> cannot be loaded.
> 
> Adding Aljoscha and Stefan to this, to see if they can add anything.
> In any case, this should have a much better error message.
> 
> I hope you understand that the 1.2-SNAPSHOT version is in some parts WIP, so 
> not really recommended for general use.
> 
> Does version 1.1 not work for you?
> 
> Greetings,
> Stephan
> 
> 
> On Wed, Sep 21, 2016 at 7:44 PM, Josh <jof...@gmail.com 
> <mailto:jof...@gmail.com>> wrote:
> Hi,
> 
> I have a Flink job which uses the RocksDBStateBackend, which has been running 
> on a Flink 1.0 cluster. 
> 
> The job is written in Scala, and I previously made some changes to the job to 
> ensure that state could be restored. For example, whenever I call `map` or 
> `flatMap` on a DataStream, I pass a named class: `.flatMap(new 
> MyCustomFlatMapper())` instead of an anonymous function.
> 
> I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able to 
> restore state. I'm seeing exceptions which look like this when trying to 
> restore from a savepoint:
> 
> java.lang.RuntimeException: Could not initialize keyed state backend. 
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.open(AbstractStreamOperator.java:148)
> Caused by: java.lang.ClassNotFoundException: 
> com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:653)
> 
> I'm not passing any anonymous functions to `map` or `flatMap` on Flink 
> DataStreams, so it looks like this exception is caused just from using Scala 
> functions like `filter`, `map`, `flatMap` on standard Scala collections, 
> within my class `MyCustomFlatMapper`.
> 
> Are there any changes to the way Flink state is restored or to 
> RocksDBStateBackend, in the last 2-3 months, which could cause this to happen?
> If so, any advice on fixing it? 
> 
> I'm hoping there's a better solution to this than rewriting the Flink job in 
> Java.
> 
> Thanks,
> 
> Josh
> 
> 
> 

Reply via email to