Re: Exception when restoring state from RocksDB - how to recover?

2016-10-11 Thread Josh
Ah ok great, thanks! I will try upgrading sometime this week then.

Cheers,
Josh

On Tue, Oct 11, 2016 at 5:37 PM, Stephan Ewen  wrote:

> Hi Josh!
>
> I think the master has gotten more stable with respect to that. The issue
> you mentioned should be fixed.
>
> Another big set of changes (the last big batch) is going in in the next
> days - this time for re-sharding timers (window operator) and other state
> that is not organized by key.
>
> If you want to be a bit conservative, give it a few days before jumping
> onto the latest master. If you are brave, give it a shot now ;-)
>
> Greetings,
> Stephan
>
>
> On Tue, Oct 11, 2016 at 5:43 PM, Josh  wrote:
>
>> Hi Stephan,
>>
>> Thanks, that sounds good!
>>
>> I'm planning to upgrade to Flink 1.2-SNAPSHOT as soon as possible - I was
>> delaying upgrading due to the issues with restoring operator state you
>> mentioned on my other thread here:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Flink-job-fails-to-restore-RocksDB-state-after-
>> upgrading-to-1-2-SNAPSHOT-td9110.html
>>
>> Sorry to jump around but do you know if that's fixed in the latest
>> 1.2-SNAPSHOT? Was it resolved by Flink-4788?
>>
>> Thanks,
>> Josh
>>
>> On Tue, Oct 11, 2016 at 4:13 PM, Stephan Ewen  wrote:
>>
>>> Hi Josh!
>>>
>>> There are two ways to improve the RocksDB / S3 behavior
>>>
>>> (1) Use the FullyAsync mode. It stores the data in one file, not in a
>>> directory. Since directories are the "eventual consistent" part of S3, this
>>> prevents many issues.
>>>
>>> (2) Flink 1.2-SNAPSHOT has some additional fixes that circumvent
>>> additional S3 issues.
>>>
>>> Hope that helps,
>>> Stephan
>>>
>>>
>>> On Tue, Oct 11, 2016 at 4:42 PM, Josh  wrote:
>>>
 Hi Aljoscha,

 Yeah I'm using S3. Is this a known problem when using S3? Do you have
 any ideas on how to restore my job from this state, or prevent it from
 happening again?

 Thanks,
 Josh


 On Tue, Oct 11, 2016 at 1:58 PM, Aljoscha Krettek 
 wrote:

> Hi,
> you are using S3 to store the checkpoints, right? It might be that
> you're running into a problem with S3 "directory listings" not being
> consistent.
>
> Cheers,
> Aljoscha
>
> On Tue, 11 Oct 2016 at 12:40 Josh  wrote:
>
> Hi all,
>
>
> I just have a couple of questions about checkpointing and restoring state 
> from RocksDB.
>
>
> 1) In some cases, I find that it is impossible to restore a job from a 
> checkpoint, due to an exception such as the one pasted below[*]. In this 
> case, it appears that the last checkpoint is somehow corrupt. Does anyone 
> know why this might happen?
>
>
> 2) When the above happens, I have no choice but to cancel the job, as it 
> repeatedly attempts to restart and keeps getting the same exception. 
> Given that no savepoint was taken recently, is it possible for me to 
> restore the job from an older checkpoint (e.g. the second-last 
> checkpoint)?
>
>
> The version of Flink I'm using Flink-1.1-SNAPSHOT, from mid-June.
>
>
> Thanks,
>
> Josh
>
>
> [*]The exception when restoring state:
>
> java.lang.Exception: Could not restore checkpointed state to operators 
> and functions
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:480)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:219)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error while restoring RocksDB 
> state from 
> /mnt/yarn/usercache/hadoop/appcache/application_1476181294189_0001/flink-io-09ad1cb1-8dff-4f9a-9f61-6cae27ee6f1d/d236820a793043bd63360df6f175cae9/StreamFlatMap_9_8/dummy_state/dc5beab1-68fb-48b3-b3d6-272497d15a09/chk-1
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:537)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.injectKeyValueStateSnapshots(RocksDBStateBackend.java:489)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.restoreState(AbstractStreamOperator.java:204)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:154)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:472)
>   ... 3 more
> Caused by: org.rocksdb.RocksDBException: NotFound: Backup not found
>   at org.rocksdb.BackupEngine.restoreDbFromLatestBackup(Native Method)
>   at 
> 

Re: Exception when restoring state from RocksDB - how to recover?

2016-10-11 Thread Stephan Ewen
Hi Josh!

I think the master has gotten more stable with respect to that. The issue
you mentioned should be fixed.

Another big set of changes (the last big batch) is going in in the next
days - this time for re-sharding timers (window operator) and other state
that is not organized by key.

If you want to be a bit conservative, give it a few days before jumping
onto the latest master. If you are brave, give it a shot now ;-)

Greetings,
Stephan


On Tue, Oct 11, 2016 at 5:43 PM, Josh  wrote:

> Hi Stephan,
>
> Thanks, that sounds good!
>
> I'm planning to upgrade to Flink 1.2-SNAPSHOT as soon as possible - I was
> delaying upgrading due to the issues with restoring operator state you
> mentioned on my other thread here:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Flink-job-fails-to-restore-RocksDB-state-
> after-upgrading-to-1-2-SNAPSHOT-td9110.html
>
> Sorry to jump around but do you know if that's fixed in the latest
> 1.2-SNAPSHOT? Was it resolved by Flink-4788?
>
> Thanks,
> Josh
>
> On Tue, Oct 11, 2016 at 4:13 PM, Stephan Ewen  wrote:
>
>> Hi Josh!
>>
>> There are two ways to improve the RocksDB / S3 behavior
>>
>> (1) Use the FullyAsync mode. It stores the data in one file, not in a
>> directory. Since directories are the "eventual consistent" part of S3, this
>> prevents many issues.
>>
>> (2) Flink 1.2-SNAPSHOT has some additional fixes that circumvent
>> additional S3 issues.
>>
>> Hope that helps,
>> Stephan
>>
>>
>> On Tue, Oct 11, 2016 at 4:42 PM, Josh  wrote:
>>
>>> Hi Aljoscha,
>>>
>>> Yeah I'm using S3. Is this a known problem when using S3? Do you have
>>> any ideas on how to restore my job from this state, or prevent it from
>>> happening again?
>>>
>>> Thanks,
>>> Josh
>>>
>>>
>>> On Tue, Oct 11, 2016 at 1:58 PM, Aljoscha Krettek 
>>> wrote:
>>>
 Hi,
 you are using S3 to store the checkpoints, right? It might be that
 you're running into a problem with S3 "directory listings" not being
 consistent.

 Cheers,
 Aljoscha

 On Tue, 11 Oct 2016 at 12:40 Josh  wrote:

 Hi all,


 I just have a couple of questions about checkpointing and restoring state 
 from RocksDB.


 1) In some cases, I find that it is impossible to restore a job from a 
 checkpoint, due to an exception such as the one pasted below[*]. In this 
 case, it appears that the last checkpoint is somehow corrupt. Does anyone 
 know why this might happen?


 2) When the above happens, I have no choice but to cancel the job, as it 
 repeatedly attempts to restart and keeps getting the same exception. Given 
 that no savepoint was taken recently, is it possible for me to restore the 
 job from an older checkpoint (e.g. the second-last checkpoint)?


 The version of Flink I'm using Flink-1.1-SNAPSHOT, from mid-June.


 Thanks,

 Josh


 [*]The exception when restoring state:

 java.lang.Exception: Could not restore checkpointed state to operators and 
 functions
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:480)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:219)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.RuntimeException: Error while restoring RocksDB state 
 from 
 /mnt/yarn/usercache/hadoop/appcache/application_1476181294189_0001/flink-io-09ad1cb1-8dff-4f9a-9f61-6cae27ee6f1d/d236820a793043bd63360df6f175cae9/StreamFlatMap_9_8/dummy_state/dc5beab1-68fb-48b3-b3d6-272497d15a09/chk-1
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:537)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.injectKeyValueStateSnapshots(RocksDBStateBackend.java:489)
at 
 org.apache.flink.streaming.api.operators.AbstractStreamOperator.restoreState(AbstractStreamOperator.java:204)
at 
 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:154)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:472)
... 3 more
 Caused by: org.rocksdb.RocksDBException: NotFound: Backup not found
at org.rocksdb.BackupEngine.restoreDbFromLatestBackup(Native Method)
at 
 org.rocksdb.BackupEngine.restoreDbFromLatestBackup(BackupEngine.java:177)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:535)
... 7 more



>>>
>>
>


Re: Exception when restoring state from RocksDB - how to recover?

2016-10-11 Thread Josh
Hi Stephan,

Thanks, that sounds good!

I'm planning to upgrade to Flink 1.2-SNAPSHOT as soon as possible - I was
delaying upgrading due to the issues with restoring operator state you
mentioned on my other thread here:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-job-fails-to-restore-RocksDB-state-after-upgrading-to-1-2-SNAPSHOT-td9110.html

Sorry to jump around but do you know if that's fixed in the latest
1.2-SNAPSHOT? Was it resolved by Flink-4788?

Thanks,
Josh

On Tue, Oct 11, 2016 at 4:13 PM, Stephan Ewen  wrote:

> Hi Josh!
>
> There are two ways to improve the RocksDB / S3 behavior
>
> (1) Use the FullyAsync mode. It stores the data in one file, not in a
> directory. Since directories are the "eventual consistent" part of S3, this
> prevents many issues.
>
> (2) Flink 1.2-SNAPSHOT has some additional fixes that circumvent
> additional S3 issues.
>
> Hope that helps,
> Stephan
>
>
> On Tue, Oct 11, 2016 at 4:42 PM, Josh  wrote:
>
>> Hi Aljoscha,
>>
>> Yeah I'm using S3. Is this a known problem when using S3? Do you have any
>> ideas on how to restore my job from this state, or prevent it from
>> happening again?
>>
>> Thanks,
>> Josh
>>
>>
>> On Tue, Oct 11, 2016 at 1:58 PM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> you are using S3 to store the checkpoints, right? It might be that
>>> you're running into a problem with S3 "directory listings" not being
>>> consistent.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Tue, 11 Oct 2016 at 12:40 Josh  wrote:
>>>
>>> Hi all,
>>>
>>>
>>> I just have a couple of questions about checkpointing and restoring state 
>>> from RocksDB.
>>>
>>>
>>> 1) In some cases, I find that it is impossible to restore a job from a 
>>> checkpoint, due to an exception such as the one pasted below[*]. In this 
>>> case, it appears that the last checkpoint is somehow corrupt. Does anyone 
>>> know why this might happen?
>>>
>>>
>>> 2) When the above happens, I have no choice but to cancel the job, as it 
>>> repeatedly attempts to restart and keeps getting the same exception. Given 
>>> that no savepoint was taken recently, is it possible for me to restore the 
>>> job from an older checkpoint (e.g. the second-last checkpoint)?
>>>
>>>
>>> The version of Flink I'm using Flink-1.1-SNAPSHOT, from mid-June.
>>>
>>>
>>> Thanks,
>>>
>>> Josh
>>>
>>>
>>> [*]The exception when restoring state:
>>>
>>> java.lang.Exception: Could not restore checkpointed state to operators and 
>>> functions
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:480)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:219)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.RuntimeException: Error while restoring RocksDB state 
>>> from 
>>> /mnt/yarn/usercache/hadoop/appcache/application_1476181294189_0001/flink-io-09ad1cb1-8dff-4f9a-9f61-6cae27ee6f1d/d236820a793043bd63360df6f175cae9/StreamFlatMap_9_8/dummy_state/dc5beab1-68fb-48b3-b3d6-272497d15a09/chk-1
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:537)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.injectKeyValueStateSnapshots(RocksDBStateBackend.java:489)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.restoreState(AbstractStreamOperator.java:204)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:154)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:472)
>>> ... 3 more
>>> Caused by: org.rocksdb.RocksDBException: NotFound: Backup not found
>>> at org.rocksdb.BackupEngine.restoreDbFromLatestBackup(Native Method)
>>> at 
>>> org.rocksdb.BackupEngine.restoreDbFromLatestBackup(BackupEngine.java:177)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:535)
>>> ... 7 more
>>>
>>>
>>>
>>
>


Re: Exception when restoring state from RocksDB - how to recover?

2016-10-11 Thread Stephan Ewen
Hi Josh!

There are two ways to improve the RocksDB / S3 behavior

(1) Use the FullyAsync mode. It stores the data in one file, not in a
directory. Since directories are the "eventual consistent" part of S3, this
prevents many issues.

(2) Flink 1.2-SNAPSHOT has some additional fixes that circumvent additional
S3 issues.

Hope that helps,
Stephan


On Tue, Oct 11, 2016 at 4:42 PM, Josh  wrote:

> Hi Aljoscha,
>
> Yeah I'm using S3. Is this a known problem when using S3? Do you have any
> ideas on how to restore my job from this state, or prevent it from
> happening again?
>
> Thanks,
> Josh
>
>
> On Tue, Oct 11, 2016 at 1:58 PM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> you are using S3 to store the checkpoints, right? It might be that you're
>> running into a problem with S3 "directory listings" not being consistent.
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 11 Oct 2016 at 12:40 Josh  wrote:
>>
>> Hi all,
>>
>>
>> I just have a couple of questions about checkpointing and restoring state 
>> from RocksDB.
>>
>>
>> 1) In some cases, I find that it is impossible to restore a job from a 
>> checkpoint, due to an exception such as the one pasted below[*]. In this 
>> case, it appears that the last checkpoint is somehow corrupt. Does anyone 
>> know why this might happen?
>>
>>
>> 2) When the above happens, I have no choice but to cancel the job, as it 
>> repeatedly attempts to restart and keeps getting the same exception. Given 
>> that no savepoint was taken recently, is it possible for me to restore the 
>> job from an older checkpoint (e.g. the second-last checkpoint)?
>>
>>
>> The version of Flink I'm using Flink-1.1-SNAPSHOT, from mid-June.
>>
>>
>> Thanks,
>>
>> Josh
>>
>>
>> [*]The exception when restoring state:
>>
>> java.lang.Exception: Could not restore checkpointed state to operators and 
>> functions
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:480)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:219)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
>>  at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error while restoring RocksDB state 
>> from 
>> /mnt/yarn/usercache/hadoop/appcache/application_1476181294189_0001/flink-io-09ad1cb1-8dff-4f9a-9f61-6cae27ee6f1d/d236820a793043bd63360df6f175cae9/StreamFlatMap_9_8/dummy_state/dc5beab1-68fb-48b3-b3d6-272497d15a09/chk-1
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:537)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.injectKeyValueStateSnapshots(RocksDBStateBackend.java:489)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.restoreState(AbstractStreamOperator.java:204)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:154)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:472)
>>  ... 3 more
>> Caused by: org.rocksdb.RocksDBException: NotFound: Backup not found
>>  at org.rocksdb.BackupEngine.restoreDbFromLatestBackup(Native Method)
>>  at 
>> org.rocksdb.BackupEngine.restoreDbFromLatestBackup(BackupEngine.java:177)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:535)
>>  ... 7 more
>>
>>
>>
>


Re: Exception when restoring state from RocksDB - how to recover?

2016-10-11 Thread Josh
Hi Aljoscha,

Yeah I'm using S3. Is this a known problem when using S3? Do you have any
ideas on how to restore my job from this state, or prevent it from
happening again?

Thanks,
Josh


On Tue, Oct 11, 2016 at 1:58 PM, Aljoscha Krettek 
wrote:

> Hi,
> you are using S3 to store the checkpoints, right? It might be that you're
> running into a problem with S3 "directory listings" not being consistent.
>
> Cheers,
> Aljoscha
>
> On Tue, 11 Oct 2016 at 12:40 Josh  wrote:
>
> Hi all,
>
>
> I just have a couple of questions about checkpointing and restoring state 
> from RocksDB.
>
>
> 1) In some cases, I find that it is impossible to restore a job from a 
> checkpoint, due to an exception such as the one pasted below[*]. In this 
> case, it appears that the last checkpoint is somehow corrupt. Does anyone 
> know why this might happen?
>
>
> 2) When the above happens, I have no choice but to cancel the job, as it 
> repeatedly attempts to restart and keeps getting the same exception. Given 
> that no savepoint was taken recently, is it possible for me to restore the 
> job from an older checkpoint (e.g. the second-last checkpoint)?
>
>
> The version of Flink I'm using Flink-1.1-SNAPSHOT, from mid-June.
>
>
> Thanks,
>
> Josh
>
>
> [*]The exception when restoring state:
>
> java.lang.Exception: Could not restore checkpointed state to operators and 
> functions
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:480)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:219)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error while restoring RocksDB state 
> from 
> /mnt/yarn/usercache/hadoop/appcache/application_1476181294189_0001/flink-io-09ad1cb1-8dff-4f9a-9f61-6cae27ee6f1d/d236820a793043bd63360df6f175cae9/StreamFlatMap_9_8/dummy_state/dc5beab1-68fb-48b3-b3d6-272497d15a09/chk-1
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:537)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.injectKeyValueStateSnapshots(RocksDBStateBackend.java:489)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.restoreState(AbstractStreamOperator.java:204)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:154)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:472)
>   ... 3 more
> Caused by: org.rocksdb.RocksDBException: NotFound: Backup not found
>   at org.rocksdb.BackupEngine.restoreDbFromLatestBackup(Native Method)
>   at 
> org.rocksdb.BackupEngine.restoreDbFromLatestBackup(BackupEngine.java:177)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:535)
>   ... 7 more
>
>
>


Re: Exception when restoring state from RocksDB - how to recover?

2016-10-11 Thread Aljoscha Krettek
Hi,
you are using S3 to store the checkpoints, right? It might be that you're
running into a problem with S3 "directory listings" not being consistent.

Cheers,
Aljoscha

On Tue, 11 Oct 2016 at 12:40 Josh  wrote:

Hi all,


I just have a couple of questions about checkpointing and restoring
state from RocksDB.


1) In some cases, I find that it is impossible to restore a job from a
checkpoint, due to an exception such as the one pasted below[*]. In
this case, it appears that the last checkpoint is somehow corrupt.
Does anyone know why this might happen?


2) When the above happens, I have no choice but to cancel the job, as
it repeatedly attempts to restart and keeps getting the same
exception. Given that no savepoint was taken recently, is it possible
for me to restore the job from an older checkpoint (e.g. the
second-last checkpoint)?


The version of Flink I'm using Flink-1.1-SNAPSHOT, from mid-June.


Thanks,

Josh


[*]The exception when restoring state:

java.lang.Exception: Could not restore checkpointed state to operators
and functions
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:480)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:219)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error while restoring RocksDB
state from 
/mnt/yarn/usercache/hadoop/appcache/application_1476181294189_0001/flink-io-09ad1cb1-8dff-4f9a-9f61-6cae27ee6f1d/d236820a793043bd63360df6f175cae9/StreamFlatMap_9_8/dummy_state/dc5beab1-68fb-48b3-b3d6-272497d15a09/chk-1
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:537)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.injectKeyValueStateSnapshots(RocksDBStateBackend.java:489)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.restoreState(AbstractStreamOperator.java:204)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:154)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:472)
... 3 more
Caused by: org.rocksdb.RocksDBException: NotFound: Backup not found
at org.rocksdb.BackupEngine.restoreDbFromLatestBackup(Native Method)
at 
org.rocksdb.BackupEngine.restoreDbFromLatestBackup(BackupEngine.java:177)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:535)
... 7 more