Hi Stephan,

Thanks, that sounds good!

I'm planning to upgrade to Flink 1.2-SNAPSHOT as soon as possible - I was
delaying upgrading due to the issues with restoring operator state you
mentioned on my other thread here:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-job-fails-to-restore-RocksDB-state-after-upgrading-to-1-2-SNAPSHOT-td9110.html

Sorry to jump around but do you know if that's fixed in the latest
1.2-SNAPSHOT? Was it resolved by Flink-4788?

Thanks,
Josh

On Tue, Oct 11, 2016 at 4:13 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Josh!
>
> There are two ways to improve the RocksDB / S3 behavior
>
> (1) Use the FullyAsync mode. It stores the data in one file, not in a
> directory. Since directories are the "eventual consistent" part of S3, this
> prevents many issues.
>
> (2) Flink 1.2-SNAPSHOT has some additional fixes that circumvent
> additional S3 issues.
>
> Hope that helps,
> Stephan
>
>
> On Tue, Oct 11, 2016 at 4:42 PM, Josh <jof...@gmail.com> wrote:
>
>> Hi Aljoscha,
>>
>> Yeah I'm using S3. Is this a known problem when using S3? Do you have any
>> ideas on how to restore my job from this state, or prevent it from
>> happening again?
>>
>> Thanks,
>> Josh
>>
>>
>> On Tue, Oct 11, 2016 at 1:58 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> you are using S3 to store the checkpoints, right? It might be that
>>> you're running into a problem with S3 "directory listings" not being
>>> consistent.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Tue, 11 Oct 2016 at 12:40 Josh <jof...@gmail.com> wrote:
>>>
>>> Hi all,
>>>
>>>
>>> I just have a couple of questions about checkpointing and restoring state 
>>> from RocksDB.
>>>
>>>
>>> 1) In some cases, I find that it is impossible to restore a job from a 
>>> checkpoint, due to an exception such as the one pasted below[*]. In this 
>>> case, it appears that the last checkpoint is somehow corrupt. Does anyone 
>>> know why this might happen?
>>>
>>>
>>> 2) When the above happens, I have no choice but to cancel the job, as it 
>>> repeatedly attempts to restart and keeps getting the same exception. Given 
>>> that no savepoint was taken recently, is it possible for me to restore the 
>>> job from an older checkpoint (e.g. the second-last checkpoint)?
>>>
>>>
>>> The version of Flink I'm using Flink-1.1-SNAPSHOT, from mid-June.
>>>
>>>
>>> Thanks,
>>>
>>> Josh
>>>
>>>
>>> [*]The exception when restoring state:
>>>
>>> java.lang.Exception: Could not restore checkpointed state to operators and 
>>> functions
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:480)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:219)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
>>>     at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.RuntimeException: Error while restoring RocksDB state 
>>> from 
>>> /mnt/yarn/usercache/hadoop/appcache/application_1476181294189_0001/flink-io-09ad1cb1-8dff-4f9a-9f61-6cae27ee6f1d/d236820a793043bd63360df6f175cae9/StreamFlatMap_9_8/dummy_state/dc5beab1-68fb-48b3-b3d6-272497d15a09/chk-1
>>>     at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:537)
>>>     at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.injectKeyValueStateSnapshots(RocksDBStateBackend.java:489)
>>>     at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.restoreState(AbstractStreamOperator.java:204)
>>>     at 
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:154)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:472)
>>>     ... 3 more
>>> Caused by: org.rocksdb.RocksDBException: NotFound: Backup not found
>>>     at org.rocksdb.BackupEngine.restoreDbFromLatestBackup(Native Method)
>>>     at 
>>> org.rocksdb.BackupEngine.restoreDbFromLatestBackup(BackupEngine.java:177)
>>>     at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:535)
>>>     ... 7 more
>>>
>>>
>>>
>>
>

Reply via email to