Hi Stephan, Thanks, that sounds good!
I'm planning to upgrade to Flink 1.2-SNAPSHOT as soon as possible - I was delaying upgrading due to the issues with restoring operator state you mentioned on my other thread here: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-job-fails-to-restore-RocksDB-state-after-upgrading-to-1-2-SNAPSHOT-td9110.html Sorry to jump around but do you know if that's fixed in the latest 1.2-SNAPSHOT? Was it resolved by Flink-4788? Thanks, Josh On Tue, Oct 11, 2016 at 4:13 PM, Stephan Ewen <se...@apache.org> wrote: > Hi Josh! > > There are two ways to improve the RocksDB / S3 behavior > > (1) Use the FullyAsync mode. It stores the data in one file, not in a > directory. Since directories are the "eventual consistent" part of S3, this > prevents many issues. > > (2) Flink 1.2-SNAPSHOT has some additional fixes that circumvent > additional S3 issues. > > Hope that helps, > Stephan > > > On Tue, Oct 11, 2016 at 4:42 PM, Josh <jof...@gmail.com> wrote: > >> Hi Aljoscha, >> >> Yeah I'm using S3. Is this a known problem when using S3? Do you have any >> ideas on how to restore my job from this state, or prevent it from >> happening again? >> >> Thanks, >> Josh >> >> >> On Tue, Oct 11, 2016 at 1:58 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> you are using S3 to store the checkpoints, right? It might be that >>> you're running into a problem with S3 "directory listings" not being >>> consistent. >>> >>> Cheers, >>> Aljoscha >>> >>> On Tue, 11 Oct 2016 at 12:40 Josh <jof...@gmail.com> wrote: >>> >>> Hi all, >>> >>> >>> I just have a couple of questions about checkpointing and restoring state >>> from RocksDB. >>> >>> >>> 1) In some cases, I find that it is impossible to restore a job from a >>> checkpoint, due to an exception such as the one pasted below[*]. In this >>> case, it appears that the last checkpoint is somehow corrupt. Does anyone >>> know why this might happen? >>> >>> >>> 2) When the above happens, I have no choice but to cancel the job, as it >>> repeatedly attempts to restart and keeps getting the same exception. Given >>> that no savepoint was taken recently, is it possible for me to restore the >>> job from an older checkpoint (e.g. the second-last checkpoint)? >>> >>> >>> The version of Flink I'm using Flink-1.1-SNAPSHOT, from mid-June. >>> >>> >>> Thanks, >>> >>> Josh >>> >>> >>> [*]The exception when restoring state: >>> >>> java.lang.Exception: Could not restore checkpointed state to operators and >>> functions >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:480) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:219) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.lang.RuntimeException: Error while restoring RocksDB state >>> from >>> /mnt/yarn/usercache/hadoop/appcache/application_1476181294189_0001/flink-io-09ad1cb1-8dff-4f9a-9f61-6cae27ee6f1d/d236820a793043bd63360df6f175cae9/StreamFlatMap_9_8/dummy_state/dc5beab1-68fb-48b3-b3d6-272497d15a09/chk-1 >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:537) >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.injectKeyValueStateSnapshots(RocksDBStateBackend.java:489) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.restoreState(AbstractStreamOperator.java:204) >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:154) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:472) >>> ... 3 more >>> Caused by: org.rocksdb.RocksDBException: NotFound: Backup not found >>> at org.rocksdb.BackupEngine.restoreDbFromLatestBackup(Native Method) >>> at >>> org.rocksdb.BackupEngine.restoreDbFromLatestBackup(BackupEngine.java:177) >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:535) >>> ... 7 more >>> >>> >>> >> >