[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5239


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170274734
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link CheckpointStateOutputStream} that writes into a specified file 
and
+ * returns a {@link FileStateHandle} upon closing.
+ *
+ * Unlike the {@link 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream},
+ * this stream does not have a threshold below which it returns a memory 
byte stream handle,
+ * and does not create random files, but writes to a specified file.
+ */
+public final class FileBasedStateOutputStream extends 
CheckpointStateOutputStream {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileBasedStateOutputStream.class);
+
+   // 

+
+   private final FSDataOutputStream out;
+
+   private final Path path;
+
+   private final FileSystem fileSystem;
+
+   private volatile boolean closed;
+
+
+   public FileBasedStateOutputStream(FileSystem fileSystem, Path path) 
throws IOException {
+   this.fileSystem = checkNotNull(fileSystem);
+   this.path = checkNotNull(path);
+
+   this.out = fileSystem.create(path, WriteMode.NO_OVERWRITE);
+   }
+
+   // 

+   //  I/O
+   // 

+
+   @Override
+   public final void write(int b) throws IOException {
+   out.write(b);
--- End diff --

👍Learned!


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170271924
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link CheckpointStateOutputStream} that writes into a specified file 
and
+ * returns a {@link FileStateHandle} upon closing.
+ *
+ * Unlike the {@link 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream},
+ * this stream does not have a threshold below which it returns a memory 
byte stream handle,
+ * and does not create random files, but writes to a specified file.
+ */
+public final class FileBasedStateOutputStream extends 
CheckpointStateOutputStream {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileBasedStateOutputStream.class);
+
+   // 

+
+   private final FSDataOutputStream out;
+
+   private final Path path;
+
+   private final FileSystem fileSystem;
+
+   private volatile boolean closed;
+
+
+   public FileBasedStateOutputStream(FileSystem fileSystem, Path path) 
throws IOException {
+   this.fileSystem = checkNotNull(fileSystem);
+   this.path = checkNotNull(path);
+
+   this.out = fileSystem.create(path, WriteMode.NO_OVERWRITE);
+   }
+
+   // 

+   //  I/O
+   // 

+
+   @Override
+   public final void write(int b) throws IOException {
+   out.write(b);
--- End diff --

The second one does not 4 bytes as the argument would make you think, but 
extracts the lower byte from the int. From the doc of `OutputStream`:
```
* Writes the specified byte to this output stream. The general
 * contract for write is that one byte is written
 * to the output stream.
(...)
```


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170270552
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
@@ -166,20 +190,38 @@ public TaskStateSnapshot retrieveLocalState(long 
checkpointID) {
 * Disposes the state of all local snapshots managed by this object.
 */
public void dispose() {
+
+   Collection> statesCopy;
+
synchronized (lock) {
-   for (Map.Entry entry : 
storedTaskStateByCheckpointID.entrySet()) {
-   discardStateObject(entry.getValue(), 
entry.getKey());
-   }
discarded = true;
+   statesCopy = new 
ArrayList<>(storedTaskStateByCheckpointID.entrySet());
}
+
+   discardExecutor.execute(() -> {
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170269557
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link CheckpointStateOutputStream} that writes into a specified file 
and
+ * returns a {@link FileStateHandle} upon closing.
+ *
+ * Unlike the {@link 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream},
+ * this stream does not have a threshold below which it returns a memory 
byte stream handle,
+ * and does not create random files, but writes to a specified file.
+ */
+public final class FileBasedStateOutputStream extends 
CheckpointStateOutputStream {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileBasedStateOutputStream.class);
+
+   // 

+
+   private final FSDataOutputStream out;
+
+   private final Path path;
+
+   private final FileSystem fileSystem;
+
+   private volatile boolean closed;
+
+
+   public FileBasedStateOutputStream(FileSystem fileSystem, Path path) 
throws IOException {
+   this.fileSystem = checkNotNull(fileSystem);
+   this.path = checkNotNull(path);
+
+   this.out = fileSystem.create(path, WriteMode.NO_OVERWRITE);
+   }
+
+   // 

+   //  I/O
+   // 

+
+   @Override
+   public final void write(int b) throws IOException {
+   out.write(b);
--- End diff --

In `FsCheckpointStateOutputStream` (The primary stream):
```
@Override
public void write(int b) throws IOException {
if (pos >= writeBuffer.length) {
flush();
}
writeBuffer[pos++] = (byte) b;
}
```
, this will write 1 byte.

but in `FileBasedStateOutputStream` (The secondary stream):
```
public final void write(int b) throws IOException {
out.write(b);
}
```
, this will write 4 byte.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170268121
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -141,7 +147,7 @@ public JobLeaderService getJobLeaderService() {
return jobLeaderService;
}
 
-   public TaskExecutorLocalStateStoresManager getTaskStateManager() {
+   public TaskExecutorLocalStateStoresManager getTaskManagerStateStore() {
return taskStateManager;
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170266913
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java
 ---
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A CheckpointStateOutputStream that wraps a primary and a secondary 
CheckpointStateOutputStream and duplicates
+ * all writes into both streams. This stream applies buffering to reduce 
the amount of dual-method calling. Furthermore,
+ * exceptions that happen in interactions with the secondary stream are 
not exposed, until the user calls
+ * {@link #closeAndGetSecondaryHandle()}. In contrast to that, exceptions 
from interactions with the primary stream
+ * are immediately returned to the user. This class is used to write state 
for local recovery as a local (secondary)
+ * copy of the (primary) state snapshot that is written to a (slower but 
highly-available) remote filesystem.
+ */
+public class DuplicatingCheckpointOutputStream extends 
CheckpointStreamFactory.CheckpointStateOutputStream {
+
+   /** Flag if the positional alignment of both streams is checked after 
each operation. */
+   private static final boolean STRICT_ALIGNMENT_CHECKS = false;
+
+   /** Default buffer size of 8KB. */
+   private static final int DEFAULT_BUFFER_SIZER = 8 * 1024;
+
+   /** Write buffer. */
+   private final byte[] buffer;
+
+   /** Position in the write buffer. */
+   private int bufferIdx;
+
+   /**
+* Primary stream for writing the checkpoint data. Failures from this 
stream are forwarded.
+*/
+   private final CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream;
+
+   /**
+* Primary stream for writing the checkpoint data. Failures from this 
stream are not forwarded until
+* {@link #closeAndGetSecondaryHandle()}.
+*/
+   private final CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream;
+
+   /**
+* Stores a potential exception that occurred while interacting with 
{@link #secondaryOutputStream}
+*/
+   private IOException secondaryStreamException;
+
+   public DuplicatingCheckpointOutputStream(
+   CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream,
+   CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream) throws IOException {
+   this(primaryOutputStream, secondaryOutputStream, 
DEFAULT_BUFFER_SIZER);
+   }
+
+   public DuplicatingCheckpointOutputStream(
+   CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream,
+   CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream,
+   int bufferSize) throws IOException {
+
+   this.primaryOutputStream = 
Preconditions.checkNotNull(primaryOutputStream);
+   this.secondaryOutputStream = 
Preconditions.checkNotNull(secondaryOutputStream);
+
+   this.buffer = new byte[bufferSize];
+   this.bufferIdx = 0;
+
+   this.secondaryStreamException = null;
+
+   checkForAlignedStreamPositions();
+   }
+
+   @Override
+   public void write(int b) throws IOException {
+
+   if (buffer.length <= bufferIdx) {
+   flushInternalBuffer();
+   }
+
+   buffer[bufferIdx] = (byte) b;
+   ++bufferIdx;
+   }
+
+   @Override
+   public void write(byte[] b) throws IOException {
+
+   write(b, 0, b.length);
+   }
+
+   @Override
+   public void write(byte[] b, int off, int len) 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170266486
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link CheckpointStateOutputStream} that writes into a specified file 
and
+ * returns a {@link FileStateHandle} upon closing.
+ *
+ * Unlike the {@link 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream},
+ * this stream does not have a threshold below which it returns a memory 
byte stream handle,
+ * and does not create random files, but writes to a specified file.
+ */
+public final class FileBasedStateOutputStream extends 
CheckpointStateOutputStream {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileBasedStateOutputStream.class);
+
+   // 

+
+   private final FSDataOutputStream out;
+
+   private final Path path;
+
+   private final FileSystem fileSystem;
+
+   private volatile boolean closed;
+
+
+   public FileBasedStateOutputStream(FileSystem fileSystem, Path path) 
throws IOException {
+   this.fileSystem = checkNotNull(fileSystem);
+   this.path = checkNotNull(path);
+
+   this.out = fileSystem.create(path, WriteMode.NO_OVERWRITE);
+   }
+
+   // 

+   //  I/O
+   // 

+
+   @Override
+   public final void write(int b) throws IOException {
+   out.write(b);
--- End diff --

I think I do not understand what you mean. Can you give a bit more detail 
which alignment should be broken and why?


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170264017
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java
 ---
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A CheckpointStateOutputStream that wraps a primary and a secondary 
CheckpointStateOutputStream and duplicates
+ * all writes into both streams. This stream applies buffering to reduce 
the amount of dual-method calling. Furthermore,
+ * exceptions that happen in interactions with the secondary stream are 
not exposed, until the user calls
+ * {@link #closeAndGetSecondaryHandle()}. In contrast to that, exceptions 
from interactions with the primary stream
+ * are immediately returned to the user. This class is used to write state 
for local recovery as a local (secondary)
+ * copy of the (primary) state snapshot that is written to a (slower but 
highly-available) remote filesystem.
+ */
+public class DuplicatingCheckpointOutputStream extends 
CheckpointStreamFactory.CheckpointStateOutputStream {
+
+   /** Flag if the positional alignment of both streams is checked after 
each operation. */
+   private static final boolean STRICT_ALIGNMENT_CHECKS = false;
+
+   /** Default buffer size of 8KB. */
+   private static final int DEFAULT_BUFFER_SIZER = 8 * 1024;
+
+   /** Write buffer. */
+   private final byte[] buffer;
+
+   /** Position in the write buffer. */
+   private int bufferIdx;
+
+   /**
+* Primary stream for writing the checkpoint data. Failures from this 
stream are forwarded.
+*/
+   private final CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream;
+
+   /**
+* Primary stream for writing the checkpoint data. Failures from this 
stream are not forwarded until
+* {@link #closeAndGetSecondaryHandle()}.
+*/
+   private final CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream;
+
+   /**
+* Stores a potential exception that occurred while interacting with 
{@link #secondaryOutputStream}
+*/
+   private IOException secondaryStreamException;
+
+   public DuplicatingCheckpointOutputStream(
+   CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream,
+   CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream) throws IOException {
+   this(primaryOutputStream, secondaryOutputStream, 
DEFAULT_BUFFER_SIZER);
+   }
+
+   public DuplicatingCheckpointOutputStream(
+   CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream,
+   CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream,
+   int bufferSize) throws IOException {
+
+   this.primaryOutputStream = 
Preconditions.checkNotNull(primaryOutputStream);
+   this.secondaryOutputStream = 
Preconditions.checkNotNull(secondaryOutputStream);
+
+   this.buffer = new byte[bufferSize];
+   this.bufferIdx = 0;
+
+   this.secondaryStreamException = null;
+
+   checkForAlignedStreamPositions();
+   }
+
+   @Override
+   public void write(int b) throws IOException {
+
+   if (buffer.length <= bufferIdx) {
+   flushInternalBuffer();
+   }
+
+   buffer[bufferIdx] = (byte) b;
+   ++bufferIdx;
+   }
+
+   @Override
+   public void write(byte[] b) throws IOException {
+
+   write(b, 0, b.length);
+   }
+
+   @Override
+   public void write(byte[] b, int off, int len) 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170263293
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 ---
@@ -186,6 +210,34 @@ private void doRelease(Iterable 
toRelease) {
return localRecoveryMode;
}
 
+   private File[] allocationBaseDirectories(AllocationID allocationID) {
+   File[] allocationDirectories = new 
File[localStateRootDirectories.length];
+   for (int i = 0; i < localStateRootDirectories.length; ++i) {
+   allocationDirectories[i] = new 
File(localStateRootDirectories[i], allocationSubDirString(allocationID));
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170261966
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -270,6 +277,8 @@ public RocksDBKeyedStateBackend(
new IncrementalSnapshotStrategy() :
new FullSnapshotStrategy();
 
+   this.writeOptions = new WriteOptions().setDisableWAL(false);
--- End diff --

No, this was unintended...the double negation got me here.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170260996
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * This class encapsulates the completed configuration for local recovery, 
i.e. the root
+ * directories into which all file-based snapshots can be written and the 
general mode for the local recover feature.
+ */
+public class LocalRecoveryConfig {
+
+   /**
+* Enum over modes of local recovery:
+* 
+* DISABLED: disables local recovery.
+* ENABLE_FILE_BASED: enables local recovery in a variant that is 
based on local files.
+* 
+*/
+   public enum LocalRecoveryMode {
+   DISABLED,
+   ENABLE_FILE_BASED;
+
+   /**
+* Attempts to parses the given string to a {@link 
LocalRecoveryMode}.
+*
+* @param input string to parse
+* @return the {@link LocalRecoveryMode} that is represented by 
the input string.
+* @throws ParseException if the string cannot be parsed to a 
{@link LocalRecoveryMode}.
+*/
+   @Nonnull
+   public static LocalRecoveryMode fromString(String input) throws 
ParseException {
+   if (input != null) {
+   String trimInput = input.trim();
+   if 
(DISABLED.toString().equalsIgnoreCase(trimInput)) {
+   return DISABLED;
+   } else if 
(ENABLE_FILE_BASED.toString().equalsIgnoreCase(trimInput)) {
+   return ENABLE_FILE_BASED;
+   }
+   }
+   throw new ParseException("Cannot parse input to 
LocalRecoveryMode: " + input);
+   }
+
+   /**
+* Extracts the {@link LocalRecoveryMode} from the given 
configuration. Defaults to LocalRecoveryMode.DISABLED
+* if no configuration value is specified or parsing the value 
resulted in an exception.
+*
+* @param configuration the configuration that specifies the 
value for the local recovery mode.
+* @return the local recovery mode as found in the config, or 
LocalRecoveryMode.DISABLED if no mode was
+* configured or the specified mode could not be parsed.
+*/
+   @Nonnull
+   public static LocalRecoveryMode fromConfig(@Nonnull 
Configuration configuration) {
+   String localRecoveryConfString = 
configuration.getString(CheckpointingOptions.LOCAL_RECOVERY);
+   try {
+   return 
LocalRecoveryConfig.LocalRecoveryMode.fromString(localRecoveryConfString);
+   } catch (ParseException ex) {
+   
LoggerFactory.getLogger(LocalRecoveryConfig.class).warn(
+   "Exception while parsing configuration 
of local recovery mode. Local recovery will be disabled.",
+   ex);
+   return 
LocalRecoveryConfig.LocalRecoveryMode.DISABLED;
+   }
+   }
+   }
+
+   /** The local recovery mode. */
+   @Nonnull
+   private final LocalRecoveryMode localRecoveryMode;
--- End diff --

This is a bit tricky, I could see that sometimes the stream abstraction 
might not be enough for all (optimized) types of  local 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170253853
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
 ---
@@ -59,12 +59,18 @@ public void before() throws Exception {
this.subtaskIdx = 0;
this.rootDirs = new File[]{temporaryFolder.newFolder(), 
temporaryFolder.newFolder()};
 
+   LocalRecoveryDirectoryProviderImpl directoryProvider =
+   new LocalRecoveryDirectoryProviderImpl(rootDirs, jobID, 
allocationID, jobVertexID, subtaskIdx);
+
+   LocalRecoveryConfig localRecoveryConfig =
+   new 
LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, 
directoryProvider);
--- End diff --

So wouldn't that mean that with `LocalRecoveryConfig.disabled()`, the 
directory provider is again nullable? I think that 
`LocalRecoveryDirectoryProvider` is a very lightweight object, that also always 
has a useful default configuration.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170253246
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 ---
@@ -46,9 +48,15 @@ public void testCreationFromConfig() throws Exception {
 
final Configuration config = new Configuration();
 
-   final String rootDirString = 
"localStateRoot1,localStateRoot2,localStateRoot3";
+   String tmpDir = System.getProperty("java.io.tmpdir") + 
File.separator;
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170253088
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1629,9 +1630,9 @@ private void closeLocalRegistry() {
 
private boolean isWithLocalRecovery(
CheckpointOptions.CheckpointType checkpointType,
-   RocksDBStateBackend.LocalRecoveryMode recoveryMode) {
+   LocalRecoveryConfig.LocalRecoveryMode recoveryMode) {
// we use local recovery when it is activated and we 
are not taking a savepoint.
-   return 
RocksDBStateBackend.LocalRecoveryMode.ENABLE_FILE_BASED.equals(recoveryMode)
+   return 
LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.equals(recoveryMode)
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170251605
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -270,6 +277,8 @@ public RocksDBKeyedStateBackend(
new IncrementalSnapshotStrategy() :
new FullSnapshotStrategy();
 
+   this.writeOptions = new WriteOptions().setDisableWAL(false);
--- End diff --

We enable WAL in contrast to the previous state. This is intended, right?


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170250183
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/**
+ * Main implementation of a {@link TaskLocalStateStore}.
+ */
+public class TaskLocalStateStoreImpl implements TaskLocalStateStore {
+
+   /** Logger for this class. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskLocalStateStoreImpl.class);
+
+   /** Maximum number of retained snapshots. */
+   @VisibleForTesting
+   static final int MAX_RETAINED_SNAPSHOTS = 5;
+
+   /** Dummy value to use instead of null to satisfy {@link 
ConcurrentHashMap}. */
+   private final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot();
+
+   /** JobID from the owning subtask. */
+   @Nonnull
+   private final JobID jobID;
+
+   /** AllocationID of the owning slot. */
+   @Nonnull
+   private final AllocationID allocationID;
+
+   /** JobVertexID of the owning subtask. */
+   @Nonnull
+   private final JobVertexID jobVertexID;
+
+   /** Subtask index of the owning subtask. */
+   @Nonnegative
+   private final int subtaskIndex;
+
+   /** The configured mode for local recovery. */
+   @Nonnull
+   private final LocalRecoveryConfig localRecoveryConfig;
+
+   /** Executor that runs the discarding of released state objects. */
+   @Nonnull
+   private final Executor discardExecutor;
+
+   /** Lock for synchronisation on the storage map and the discarded 
status. */
+   @Nonnull
+   private final Object lock;
+
+   /** Status flag if this store was already discarded. */
+   @GuardedBy("lock")
+   private boolean discarded;
+
+   /** Maps checkpoint ids to local TaskStateSnapshots. */
+   @Nonnull
+   @GuardedBy("lock")
+   private final SortedMap 
storedTaskStateByCheckpointID;
+
+   public TaskLocalStateStoreImpl(
+   @Nonnull JobID jobID,
+   @Nonnull AllocationID allocationID,
+   @Nonnull JobVertexID jobVertexID,
+   @Nonnegative int subtaskIndex,
+   @Nonnull LocalRecoveryConfig localRecoveryConfig,
+   @Nonnull Executor discardExecutor) {
+
+   this.jobID = jobID;
+   this.allocationID = allocationID;
+   this.jobVertexID = jobVertexID;
+   this.subtaskIndex = subtaskIndex;
+   this.discardExecutor = discardExecutor;
+   this.lock = new Object();
+   this.storedTaskStateByCheckpointID = new TreeMap<>();
+   this.discarded = false;
+   this.localRecoveryConfig = localRecoveryConfig;
+   }
+
+   @Override
+   public void storeLocalState(

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170243957
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -141,7 +147,7 @@ public JobLeaderService getJobLeaderService() {
return jobLeaderService;
}
 
-   public TaskExecutorLocalStateStoresManager getTaskStateManager() {
+   public TaskExecutorLocalStateStoresManager getTaskManagerStateStore() {
return taskStateManager;
--- End diff --

nit: Since we have changed `getTaskStateManager` to 
`getTaskManagerStateStore` maybe we should also change `taskStateManager` to 
`taskManagerStateStore`... this will be more consistent.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170242003
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link CheckpointStateOutputStream} that writes into a specified file 
and
+ * returns a {@link FileStateHandle} upon closing.
+ *
+ * Unlike the {@link 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream},
+ * this stream does not have a threshold below which it returns a memory 
byte stream handle,
+ * and does not create random files, but writes to a specified file.
+ */
+public final class FileBasedStateOutputStream extends 
CheckpointStateOutputStream {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileBasedStateOutputStream.class);
+
+   // 

+
+   private final FSDataOutputStream out;
+
+   private final Path path;
+
+   private final FileSystem fileSystem;
+
+   private volatile boolean closed;
+
+
+   public FileBasedStateOutputStream(FileSystem fileSystem, Path path) 
throws IOException {
+   this.fileSystem = checkNotNull(fileSystem);
+   this.path = checkNotNull(path);
+
+   this.out = fileSystem.create(path, WriteMode.NO_OVERWRITE);
+   }
+
+   // 

+   //  I/O
+   // 

+
+   @Override
+   public final void write(int b) throws IOException {
+   out.write(b);
--- End diff --

I found this is inconsistency with `FsCheckpointStateOutputStream`, this 
means that invoking `DuplicatingCheckpointOutputStream.write(int b).` will 
broken the position align. Is this intended?


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170249880
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 ---
@@ -186,6 +210,34 @@ private void doRelease(Iterable 
toRelease) {
return localRecoveryMode;
}
 
+   private File[] allocationBaseDirectories(AllocationID allocationID) {
+   File[] allocationDirectories = new 
File[localStateRootDirectories.length];
+   for (int i = 0; i < localStateRootDirectories.length; ++i) {
+   allocationDirectories[i] = new 
File(localStateRootDirectories[i], allocationSubDirString(allocationID));
--- End diff --

`allocationSubDirString` could be pulled out of the loop.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170239200
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java
 ---
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A CheckpointStateOutputStream that wraps a primary and a secondary 
CheckpointStateOutputStream and duplicates
+ * all writes into both streams. This stream applies buffering to reduce 
the amount of dual-method calling. Furthermore,
+ * exceptions that happen in interactions with the secondary stream are 
not exposed, until the user calls
+ * {@link #closeAndGetSecondaryHandle()}. In contrast to that, exceptions 
from interactions with the primary stream
+ * are immediately returned to the user. This class is used to write state 
for local recovery as a local (secondary)
+ * copy of the (primary) state snapshot that is written to a (slower but 
highly-available) remote filesystem.
+ */
+public class DuplicatingCheckpointOutputStream extends 
CheckpointStreamFactory.CheckpointStateOutputStream {
+
+   /** Flag if the positional alignment of both streams is checked after 
each operation. */
+   private static final boolean STRICT_ALIGNMENT_CHECKS = false;
+
+   /** Default buffer size of 8KB. */
+   private static final int DEFAULT_BUFFER_SIZER = 8 * 1024;
+
+   /** Write buffer. */
+   private final byte[] buffer;
+
+   /** Position in the write buffer. */
+   private int bufferIdx;
+
+   /**
+* Primary stream for writing the checkpoint data. Failures from this 
stream are forwarded.
+*/
+   private final CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream;
+
+   /**
+* Primary stream for writing the checkpoint data. Failures from this 
stream are not forwarded until
+* {@link #closeAndGetSecondaryHandle()}.
+*/
+   private final CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream;
+
+   /**
+* Stores a potential exception that occurred while interacting with 
{@link #secondaryOutputStream}
+*/
+   private IOException secondaryStreamException;
+
+   public DuplicatingCheckpointOutputStream(
+   CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream,
+   CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream) throws IOException {
+   this(primaryOutputStream, secondaryOutputStream, 
DEFAULT_BUFFER_SIZER);
+   }
+
+   public DuplicatingCheckpointOutputStream(
+   CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream,
+   CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream,
+   int bufferSize) throws IOException {
+
+   this.primaryOutputStream = 
Preconditions.checkNotNull(primaryOutputStream);
+   this.secondaryOutputStream = 
Preconditions.checkNotNull(secondaryOutputStream);
+
+   this.buffer = new byte[bufferSize];
+   this.bufferIdx = 0;
+
+   this.secondaryStreamException = null;
+
+   checkForAlignedStreamPositions();
+   }
+
+   @Override
+   public void write(int b) throws IOException {
+
+   if (buffer.length <= bufferIdx) {
+   flushInternalBuffer();
+   }
+
+   buffer[bufferIdx] = (byte) b;
+   ++bufferIdx;
+   }
+
+   @Override
+   public void write(byte[] b) throws IOException {
+
+   write(b, 0, b.length);
+   }
+
+   @Override
+   public void write(byte[] b, int off, int len) 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170238910
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java
 ---
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A CheckpointStateOutputStream that wraps a primary and a secondary 
CheckpointStateOutputStream and duplicates
+ * all writes into both streams. This stream applies buffering to reduce 
the amount of dual-method calling. Furthermore,
+ * exceptions that happen in interactions with the secondary stream are 
not exposed, until the user calls
+ * {@link #closeAndGetSecondaryHandle()}. In contrast to that, exceptions 
from interactions with the primary stream
+ * are immediately returned to the user. This class is used to write state 
for local recovery as a local (secondary)
+ * copy of the (primary) state snapshot that is written to a (slower but 
highly-available) remote filesystem.
+ */
+public class DuplicatingCheckpointOutputStream extends 
CheckpointStreamFactory.CheckpointStateOutputStream {
+
+   /** Flag if the positional alignment of both streams is checked after 
each operation. */
+   private static final boolean STRICT_ALIGNMENT_CHECKS = false;
+
+   /** Default buffer size of 8KB. */
+   private static final int DEFAULT_BUFFER_SIZER = 8 * 1024;
+
+   /** Write buffer. */
+   private final byte[] buffer;
+
+   /** Position in the write buffer. */
+   private int bufferIdx;
+
+   /**
+* Primary stream for writing the checkpoint data. Failures from this 
stream are forwarded.
+*/
+   private final CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream;
+
+   /**
+* Primary stream for writing the checkpoint data. Failures from this 
stream are not forwarded until
+* {@link #closeAndGetSecondaryHandle()}.
+*/
+   private final CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream;
+
+   /**
+* Stores a potential exception that occurred while interacting with 
{@link #secondaryOutputStream}
+*/
+   private IOException secondaryStreamException;
+
+   public DuplicatingCheckpointOutputStream(
+   CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream,
+   CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream) throws IOException {
+   this(primaryOutputStream, secondaryOutputStream, 
DEFAULT_BUFFER_SIZER);
+   }
+
+   public DuplicatingCheckpointOutputStream(
+   CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream,
+   CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream,
+   int bufferSize) throws IOException {
+
+   this.primaryOutputStream = 
Preconditions.checkNotNull(primaryOutputStream);
+   this.secondaryOutputStream = 
Preconditions.checkNotNull(secondaryOutputStream);
+
+   this.buffer = new byte[bufferSize];
+   this.bufferIdx = 0;
+
+   this.secondaryStreamException = null;
+
+   checkForAlignedStreamPositions();
+   }
+
+   @Override
+   public void write(int b) throws IOException {
+
+   if (buffer.length <= bufferIdx) {
+   flushInternalBuffer();
+   }
+
+   buffer[bufferIdx] = (byte) b;
+   ++bufferIdx;
+   }
+
+   @Override
+   public void write(byte[] b) throws IOException {
+
+   write(b, 0, b.length);
+   }
+
+   @Override
+   public void write(byte[] b, int off, int len) 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170229683
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * This class encapsulates the completed configuration for local recovery, 
i.e. the root
+ * directories into which all file-based snapshots can be written and the 
general mode for the local recover feature.
+ */
+public class LocalRecoveryConfig {
+
+   /**
+* Enum over modes of local recovery:
+* 
+* DISABLED: disables local recovery.
+* ENABLE_FILE_BASED: enables local recovery in a variant that is 
based on local files.
+* 
+*/
+   public enum LocalRecoveryMode {
+   DISABLED,
+   ENABLE_FILE_BASED;
+
+   /**
+* Attempts to parses the given string to a {@link 
LocalRecoveryMode}.
+*
+* @param input string to parse
+* @return the {@link LocalRecoveryMode} that is represented by 
the input string.
+* @throws ParseException if the string cannot be parsed to a 
{@link LocalRecoveryMode}.
+*/
+   @Nonnull
+   public static LocalRecoveryMode fromString(String input) throws 
ParseException {
+   if (input != null) {
+   String trimInput = input.trim();
+   if 
(DISABLED.toString().equalsIgnoreCase(trimInput)) {
+   return DISABLED;
+   } else if 
(ENABLE_FILE_BASED.toString().equalsIgnoreCase(trimInput)) {
+   return ENABLE_FILE_BASED;
+   }
+   }
+   throw new ParseException("Cannot parse input to 
LocalRecoveryMode: " + input);
+   }
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170228635
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 ---
@@ -89,6 +90,22 @@ public TaskExecutorLocalStateStoresManager(
}
}
}
+
+   // install a shutdown hook
+   this.shutdownHook = new 
Thread("TaskExecutorLocalStateStoresManager shutdown hook") {
--- End diff --

I made a separate commit that introduces `ShudownHookUtil`. Used it to 
deduplicated the same code in like 20 places...


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170039184
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalDirectoryProvider.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.io.File;
+
+/**
+ * Easy-to-construct implementation of {@link 
LocalRecoveryDirectoryProvider} to use in tests.
+ */
+public class TestLocalDirectoryProvider extends 
LocalRecoveryDirectoryProviderImpl {
+
+   public TestLocalDirectoryProvider() {
+   super(
+   new File(System.getProperty("java.io.tmpdir")),
--- End diff --

I wouldn't use the tmp dir, but instead pass in a pass. Then the user can 
instantiate `TemporaryFolder` to make sure that things are cleaned up.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170038982
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
 ---
@@ -59,12 +59,18 @@ public void before() throws Exception {
this.subtaskIdx = 0;
this.rootDirs = new File[]{temporaryFolder.newFolder(), 
temporaryFolder.newFolder()};
 
+   LocalRecoveryDirectoryProviderImpl directoryProvider =
+   new LocalRecoveryDirectoryProviderImpl(rootDirs, jobID, 
allocationID, jobVertexID, subtaskIdx);
+
+   LocalRecoveryConfig localRecoveryConfig =
+   new 
LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, 
directoryProvider);
--- End diff --

It's a bit strange that we have to pass in a `directoryProvider`, even 
though the `LocalRecoveryMode` is disabled. I think it would be better to have 
something like `LocalRecoveryConfig.disabled()` and 
`LocalRecoveryConfig.fileBased(directoryProvider)`.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170038223
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 ---
@@ -46,9 +48,15 @@ public void testCreationFromConfig() throws Exception {
 
final Configuration config = new Configuration();
 
-   final String rootDirString = 
"localStateRoot1,localStateRoot2,localStateRoot3";
+   String tmpDir = System.getProperty("java.io.tmpdir") + 
File.separator;
--- End diff --

I think it's better to use a `TemporaryFolder` if there are any files 
created to ensure a proper clean up at the end of the test.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170037177
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * This class encapsulates the completed configuration for local recovery, 
i.e. the root
+ * directories into which all file-based snapshots can be written and the 
general mode for the local recover feature.
+ */
+public class LocalRecoveryConfig {
+
+   /**
+* Enum over modes of local recovery:
+* 
+* DISABLED: disables local recovery.
+* ENABLE_FILE_BASED: enables local recovery in a variant that is 
based on local files.
+* 
+*/
+   public enum LocalRecoveryMode {
+   DISABLED,
+   ENABLE_FILE_BASED;
+
+   /**
+* Attempts to parses the given string to a {@link 
LocalRecoveryMode}.
+*
+* @param input string to parse
+* @return the {@link LocalRecoveryMode} that is represented by 
the input string.
+* @throws ParseException if the string cannot be parsed to a 
{@link LocalRecoveryMode}.
+*/
+   @Nonnull
+   public static LocalRecoveryMode fromString(String input) throws 
ParseException {
+   if (input != null) {
+   String trimInput = input.trim();
+   if 
(DISABLED.toString().equalsIgnoreCase(trimInput)) {
+   return DISABLED;
+   } else if 
(ENABLE_FILE_BASED.toString().equalsIgnoreCase(trimInput)) {
+   return ENABLE_FILE_BASED;
+   }
+   }
+   throw new ParseException("Cannot parse input to 
LocalRecoveryMode: " + input);
+   }
+
+   /**
+* Extracts the {@link LocalRecoveryMode} from the given 
configuration. Defaults to LocalRecoveryMode.DISABLED
+* if no configuration value is specified or parsing the value 
resulted in an exception.
+*
+* @param configuration the configuration that specifies the 
value for the local recovery mode.
+* @return the local recovery mode as found in the config, or 
LocalRecoveryMode.DISABLED if no mode was
+* configured or the specified mode could not be parsed.
+*/
+   @Nonnull
+   public static LocalRecoveryMode fromConfig(@Nonnull 
Configuration configuration) {
+   String localRecoveryConfString = 
configuration.getString(CheckpointingOptions.LOCAL_RECOVERY);
+   try {
+   return 
LocalRecoveryConfig.LocalRecoveryMode.fromString(localRecoveryConfString);
+   } catch (ParseException ex) {
+   
LoggerFactory.getLogger(LocalRecoveryConfig.class).warn(
+   "Exception while parsing configuration 
of local recovery mode. Local recovery will be disabled.",
+   ex);
+   return 
LocalRecoveryConfig.LocalRecoveryMode.DISABLED;
+   }
+   }
+   }
+
+   /** The local recovery mode. */
+   @Nonnull
+   private final LocalRecoveryMode localRecoveryMode;
--- End diff --

This is more of a general question than a review comment: How will it look 
in the future if we want to support a non file based local 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170035520
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * This class encapsulates the completed configuration for local recovery, 
i.e. the root
+ * directories into which all file-based snapshots can be written and the 
general mode for the local recover feature.
+ */
+public class LocalRecoveryConfig {
+
+   /**
+* Enum over modes of local recovery:
+* 
+* DISABLED: disables local recovery.
+* ENABLE_FILE_BASED: enables local recovery in a variant that is 
based on local files.
+* 
+*/
+   public enum LocalRecoveryMode {
+   DISABLED,
+   ENABLE_FILE_BASED;
+
+   /**
+* Attempts to parses the given string to a {@link 
LocalRecoveryMode}.
+*
+* @param input string to parse
+* @return the {@link LocalRecoveryMode} that is represented by 
the input string.
+* @throws ParseException if the string cannot be parsed to a 
{@link LocalRecoveryMode}.
+*/
+   @Nonnull
+   public static LocalRecoveryMode fromString(String input) throws 
ParseException {
+   if (input != null) {
+   String trimInput = input.trim();
+   if 
(DISABLED.toString().equalsIgnoreCase(trimInput)) {
+   return DISABLED;
+   } else if 
(ENABLE_FILE_BASED.toString().equalsIgnoreCase(trimInput)) {
+   return ENABLE_FILE_BASED;
+   }
+   }
+   throw new ParseException("Cannot parse input to 
LocalRecoveryMode: " + input);
+   }
--- End diff --

Why not simply using `LocalRecoveryMode.valueOf()`?


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170034046
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1629,9 +1630,9 @@ private void closeLocalRegistry() {
 
private boolean isWithLocalRecovery(
CheckpointOptions.CheckpointType checkpointType,
-   RocksDBStateBackend.LocalRecoveryMode recoveryMode) {
+   LocalRecoveryConfig.LocalRecoveryMode recoveryMode) {
// we use local recovery when it is activated and we 
are not taking a savepoint.
-   return 
RocksDBStateBackend.LocalRecoveryMode.ENABLE_FILE_BASED.equals(recoveryMode)
+   return 
LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.equals(recoveryMode)
--- End diff --

I prefer for enum comparisons `==`, but we had this discussion before ;-)


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170033424
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 ---
@@ -193,7 +193,12 @@ public static TaskManagerServicesConfiguration 
fromConfiguration(
}
 
final String[] tmpDirs = 
ConfigurationUtils.parseTempDirectories(configuration);
-   final String[] localStateRootDir = 
ConfigurationUtils.parseLocalStateDirectories(configuration);
+   String[] localStateRootDir = 
ConfigurationUtils.parseLocalStateDirectories(configuration);
--- End diff --

Could also be `final`


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r170026181
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 ---
@@ -89,6 +90,22 @@ public TaskExecutorLocalStateStoresManager(
}
}
}
+
+   // install a shutdown hook
+   this.shutdownHook = new 
Thread("TaskExecutorLocalStateStoresManager shutdown hook") {
--- End diff --

It might make sense to reuse `BlobUtils#addShutdownHook` or to refactor 
this and then reuse.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169909152
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class represents a directory that is the target for a state 
snapshot. This class provides some method that
+ * simplify resource management when dealing with such directories, e.g. 
it can produce a {@link DirectoryStateHandle}
+ * when the snapshot is completed and disposal considers whether or not a 
snapshot was already completed. For a
+ * completed snapshot, the ownership for cleanup is transferred to the 
created directory state handle. For incomplete
+ * snapshots, calling {@link #deleteIfIncompleteSnapshot()} will delete 
the underlying directory resource.
+ */
+public class SnapshotDirectory {
+
+   /**
+* Lifecycle stages of a snapshot directory.
+*/
+   enum State {
+   ONGOING, COMPLETED, DELETED
+   }
+
+   /** This path describes the underlying directory for the snapshot. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the snapshot directory. */
+   @Nonnull
+   private final FileSystem fileSystem;
+
+   /** This reference tracks the lifecycle state of the snapshot 
directory. */
+   @Nonnull
+   private AtomicReference state;
+
+   public SnapshotDirectory(@Nonnull Path directory, @Nonnull FileSystem 
fileSystem) {
+   this.directory = directory;
+   this.fileSystem = fileSystem;
+   this.state = new AtomicReference<>(State.ONGOING);
+   }
+
+   public SnapshotDirectory(@Nonnull Path directory) throws IOException {
+   this(directory, directory.getFileSystem());
+   }
+
+   @Nonnull
+   public Path getDirectory() {
+   return directory;
+   }
+
+   public boolean mkdirs() throws IOException {
+   return fileSystem.mkdirs(directory);
+   }
+
+   @Nonnull
+   public FileSystem getFileSystem() {
+   return fileSystem;
+   }
+
+   public boolean exists() throws IOException {
+   return fileSystem.exists(directory);
+   }
+
+   /**
+* List the statuses of the files/directories in the snapshot directory.
+*
+* @return the statuses of the files/directories in the given path.
+* @throws IOException if there is a problem creating the file statuses.
+*/
+   public FileStatus[] listStatus() throws IOException {
+   return fileSystem.listStatus(directory);
+   }
+
+   /**
+* Calling this method completes the snapshot into the snapshot 
directory and creates a corresponding
+* {@link DirectoryStateHandle} that points to the snapshot directory. 
Calling this method will also change the
+* lifecycle state from "ongoing" to "completed". If the state was 
already deleted, an {@link IOException} is
+* thrown.
+*
+* @return a directory state handle that points to the snapshot 
directory.
+* @throws IOException if the state of this snapshot directory object 
is different from "ongoing".
+*/
+   public DirectoryStateHandle completeSnapshotAndGetHandle() throws 
IOException {
+   if (state.compareAndSet(State.ONGOING, State.COMPLETED)) {
+   return new DirectoryStateHandle(directory, fileSystem);
+   } else {
+   throw new IOException("Expected state " + State.ONGOING 
+ " but found state " + 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169905650
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class represents a directory that is the target for a state 
snapshot. This class provides some method that
+ * simplify resource management when dealing with such directories, e.g. 
it can produce a {@link DirectoryStateHandle}
+ * when the snapshot is completed and disposal considers whether or not a 
snapshot was already completed. For a
+ * completed snapshot, the ownership for cleanup is transferred to the 
created directory state handle. For incomplete
+ * snapshots, calling {@link #deleteIfIncompleteSnapshot()} will delete 
the underlying directory resource.
+ */
+public class SnapshotDirectory {
+
+   /**
+* Lifecycle stages of a snapshot directory.
+*/
+   enum State {
+   ONGOING, COMPLETED, DELETED
+   }
+
+   /** This path describes the underlying directory for the snapshot. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the snapshot directory. */
+   @Nonnull
+   private final FileSystem fileSystem;
+
+   /** This reference tracks the lifecycle state of the snapshot 
directory. */
+   @Nonnull
+   private AtomicReference state;
+
+   public SnapshotDirectory(@Nonnull Path directory, @Nonnull FileSystem 
fileSystem) {
+   this.directory = directory;
+   this.fileSystem = fileSystem;
+   this.state = new AtomicReference<>(State.ONGOING);
+   }
+
+   public SnapshotDirectory(@Nonnull Path directory) throws IOException {
+   this(directory, directory.getFileSystem());
+   }
+
+   @Nonnull
+   public Path getDirectory() {
+   return directory;
+   }
+
+   public boolean mkdirs() throws IOException {
+   return fileSystem.mkdirs(directory);
+   }
+
+   @Nonnull
+   public FileSystem getFileSystem() {
+   return fileSystem;
+   }
+
+   public boolean exists() throws IOException {
+   return fileSystem.exists(directory);
+   }
+
+   /**
+* List the statuses of the files/directories in the snapshot directory.
+*
+* @return the statuses of the files/directories in the given path.
+* @throws IOException if there is a problem creating the file statuses.
+*/
+   public FileStatus[] listStatus() throws IOException {
+   return fileSystem.listStatus(directory);
+   }
+
+   /**
+* Calling this method completes the snapshot into the snapshot 
directory and creates a corresponding
+* {@link DirectoryStateHandle} that points to the snapshot directory. 
Calling this method will also change the
+* lifecycle state from "ongoing" to "completed". If the state was 
already deleted, an {@link IOException} is
+* thrown.
+*
+* @return a directory state handle that points to the snapshot 
directory.
+* @throws IOException if the state of this snapshot directory object 
is different from "ongoing".
+*/
+   public DirectoryStateHandle completeSnapshotAndGetHandle() throws 
IOException {
+   if (state.compareAndSet(State.ONGOING, State.COMPLETED)) {
+   return new DirectoryStateHandle(directory, fileSystem);
+   } else {
+   throw new IOException("Expected state " + State.ONGOING 
+ " but found state " + 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169899631
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class represents a directory that is the target for a state 
snapshot. This class provides some method that
+ * simplify resource management when dealing with such directories, e.g. 
it can produce a {@link DirectoryStateHandle}
+ * when the snapshot is completed and disposal considers whether or not a 
snapshot was already completed. For a
+ * completed snapshot, the ownership for cleanup is transferred to the 
created directory state handle. For incomplete
+ * snapshots, calling {@link #deleteIfIncompleteSnapshot()} will delete 
the underlying directory resource.
+ */
+public class SnapshotDirectory {
+
+   /**
+* Lifecycle stages of a snapshot directory.
+*/
+   enum State {
+   ONGOING, COMPLETED, DELETED
+   }
+
+   /** This path describes the underlying directory for the snapshot. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the snapshot directory. */
+   @Nonnull
+   private final FileSystem fileSystem;
+
+   /** This reference tracks the lifecycle state of the snapshot 
directory. */
+   @Nonnull
+   private AtomicReference state;
+
+   public SnapshotDirectory(@Nonnull Path directory, @Nonnull FileSystem 
fileSystem) {
+   this.directory = directory;
+   this.fileSystem = fileSystem;
+   this.state = new AtomicReference<>(State.ONGOING);
+   }
+
+   public SnapshotDirectory(@Nonnull Path directory) throws IOException {
+   this(directory, directory.getFileSystem());
+   }
+
+   @Nonnull
+   public Path getDirectory() {
+   return directory;
+   }
+
+   public boolean mkdirs() throws IOException {
+   return fileSystem.mkdirs(directory);
+   }
+
+   @Nonnull
+   public FileSystem getFileSystem() {
+   return fileSystem;
+   }
+
+   public boolean exists() throws IOException {
+   return fileSystem.exists(directory);
+   }
+
+   /**
+* List the statuses of the files/directories in the snapshot directory.
+*
+* @return the statuses of the files/directories in the given path.
+* @throws IOException if there is a problem creating the file statuses.
+*/
+   public FileStatus[] listStatus() throws IOException {
+   return fileSystem.listStatus(directory);
+   }
+
+   /**
+* Calling this method completes the snapshot into the snapshot 
directory and creates a corresponding
+* {@link DirectoryStateHandle} that points to the snapshot directory. 
Calling this method will also change the
+* lifecycle state from "ongoing" to "completed". If the state was 
already deleted, an {@link IOException} is
+* thrown.
+*
+* @return a directory state handle that points to the snapshot 
directory.
+* @throws IOException if the state of this snapshot directory object 
is different from "ongoing".
+*/
+   public DirectoryStateHandle completeSnapshotAndGetHandle() throws 
IOException {
+   if (state.compareAndSet(State.ONGOING, State.COMPLETED)) {
+   return new DirectoryStateHandle(directory, fileSystem);
+   } else {
+   throw new IOException("Expected state " + State.ONGOING 
+ " but found state " + 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169899133
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class represents a directory that is the target for a state 
snapshot. This class provides some method that
+ * simplify resource management when dealing with such directories, e.g. 
it can produce a {@link DirectoryStateHandle}
+ * when the snapshot is completed and disposal considers whether or not a 
snapshot was already completed. For a
+ * completed snapshot, the ownership for cleanup is transferred to the 
created directory state handle. For incomplete
+ * snapshots, calling {@link #deleteIfIncompleteSnapshot()} will delete 
the underlying directory resource.
+ */
+public class SnapshotDirectory {
+
+   /**
+* Lifecycle stages of a snapshot directory.
+*/
+   enum State {
+   ONGOING, COMPLETED, DELETED
+   }
+
+   /** This path describes the underlying directory for the snapshot. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the snapshot directory. */
+   @Nonnull
+   private final FileSystem fileSystem;
+
+   /** This reference tracks the lifecycle state of the snapshot 
directory. */
+   @Nonnull
+   private AtomicReference state;
+
+   public SnapshotDirectory(@Nonnull Path directory, @Nonnull FileSystem 
fileSystem) {
+   this.directory = directory;
+   this.fileSystem = fileSystem;
+   this.state = new AtomicReference<>(State.ONGOING);
+   }
+
+   public SnapshotDirectory(@Nonnull Path directory) throws IOException {
+   this(directory, directory.getFileSystem());
+   }
+
+   @Nonnull
+   public Path getDirectory() {
+   return directory;
+   }
+
+   public boolean mkdirs() throws IOException {
+   return fileSystem.mkdirs(directory);
+   }
+
+   @Nonnull
+   public FileSystem getFileSystem() {
+   return fileSystem;
+   }
+
+   public boolean exists() throws IOException {
+   return fileSystem.exists(directory);
+   }
+
+   /**
+* List the statuses of the files/directories in the snapshot directory.
+*
+* @return the statuses of the files/directories in the given path.
+* @throws IOException if there is a problem creating the file statuses.
+*/
+   public FileStatus[] listStatus() throws IOException {
+   return fileSystem.listStatus(directory);
+   }
+
+   /**
+* Calling this method completes the snapshot into the snapshot 
directory and creates a corresponding
+* {@link DirectoryStateHandle} that points to the snapshot directory. 
Calling this method will also change the
+* lifecycle state from "ongoing" to "completed". If the state was 
already deleted, an {@link IOException} is
+* thrown.
+*
+* @return a directory state handle that points to the snapshot 
directory.
+* @throws IOException if the state of this snapshot directory object 
is different from "ongoing".
+*/
+   public DirectoryStateHandle completeSnapshotAndGetHandle() throws 
IOException {
+   if (state.compareAndSet(State.ONGOING, State.COMPLETED)) {
+   return new DirectoryStateHandle(directory, fileSystem);
+   } else {
+   throw new IOException("Expected state " + State.ONGOING 
+ " but found state " + 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169898758
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/**
+ * This state handle represents a directory. This class is, for example, 
used to represent the directory of RocksDB's
+ * native checkpoint directories for local recovery.
+ */
+public class DirectoryStateHandle implements StateObject {
+
+   /** Serial version. */
+   private static final long serialVersionUID = 1L;
+
+   /** The path that describes the directory. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the directory described by path. */
+   @Nonnull
+   private final FileSystem fileSystem;
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169898668
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class represents a directory that is the target for a state 
snapshot. This class provides some method that
+ * simplify resource management when dealing with such directories, e.g. 
it can produce a {@link DirectoryStateHandle}
+ * when the snapshot is completed and disposal considers whether or not a 
snapshot was already completed. For a
+ * completed snapshot, the ownership for cleanup is transferred to the 
created directory state handle. For incomplete
+ * snapshots, calling {@link #deleteIfIncompleteSnapshot()} will delete 
the underlying directory resource.
+ */
+public class SnapshotDirectory {
+
+   /**
+* Lifecycle stages of a snapshot directory.
+*/
+   enum State {
+   ONGOING, COMPLETED, DELETED
+   }
+
+   /** This path describes the underlying directory for the snapshot. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the snapshot directory. */
+   @Nonnull
+   private final FileSystem fileSystem;
--- End diff --

In this class, I would keep it because it is used in several operations and 
obtaining the `FileSystem` from `Path` looks like it might not be a cheap 
operation. So this is more like a cache, or in case that you already have the 
`FileSystem` when constructing the object.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169897306
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/**
+ * This state handle represents a directory. This class is, for example, 
used to represent the directory of RocksDB's
+ * native checkpoint directories for local recovery.
+ */
+public class DirectoryStateHandle implements StateObject {
+
+   /** Serial version. */
+   private static final long serialVersionUID = 1L;
+
+   /** The path that describes the directory. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the directory described by path. */
+   @Nonnull
+   private final FileSystem fileSystem;
--- End diff --

It is true, and just was no problem because this handle is never 
serialized, it just is forced to inherit this from `StateObject`. 👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169896224
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
 ---
@@ -172,4 +167,30 @@ public CheckpointStreamWithResultProvider create(
return new 
CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
}
}
+
+   /**
+* Helper method that takes a {@link SnapshotResult} 
and a {@link KeyGroupRangeOffsets} and
+* creates a {@link SnapshotResult} by combining 
the key groups offsets with all the
+* present stream state handles.
+*/
+   static SnapshotResult 
toKeyedStateHandleSnapshotResult(
+   @Nullable SnapshotResult snapshotResult,
--- End diff --

Already changed in my updated branch.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169896622
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ * This class is a keyed state handle based on a directory. It combines a 
{@link DirectoryStateHandle} and a
+ * {@link KeyGroupRange}.
+ */
+public class DirectoryKeyedStateHandle implements KeyedStateHandle {
+
+   @Nonnull
+   private final DirectoryStateHandle directoryStateHandle;
+
+   @Nonnull
+   private final KeyGroupRange keyGroupRange;
+
+   public DirectoryKeyedStateHandle(
+   @Nonnull DirectoryStateHandle directoryStateHandle,
+   @Nonnull KeyGroupRange keyGroupRange) {
+
+   this.directoryStateHandle = directoryStateHandle;
+   this.keyGroupRange = keyGroupRange;
+   }
+
+   @Nonnull
+   public DirectoryStateHandle getDirectoryStateHandle() {
+   return directoryStateHandle;
+   }
+
+   @Nonnull
+   @Override
+   public KeyGroupRange getKeyGroupRange() {
+   return keyGroupRange;
+   }
+
+   @Override
+   public void discardState() throws Exception {
+   directoryStateHandle.discardState();
+   }
+
+   @Override
+   public long getStateSize() {
+   return directoryStateHandle.getStateSize();
+   }
+
+   @Override
+   public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+   return 
keyGroupRange.getIntersection(keyGroupRange).getNumberOfKeyGroups() > 0 ? this 
: null;
--- End diff --

 👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169895777
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -2123,9 +2162,21 @@ void takeSnapshot() throws Exception {
checkpointId,
sstFiles,
miscFiles,
-   metaStateHandle);
+   metaStateHandle.getJobManagerOwnedSnapshot());
+
+   DirectoryStateHandle directoryStateHandle = 
localBackupDirectory.completeSnapshotAndGetHandle();
+   StreamStateHandle taskLocalSnapshotMetaDataStateHandle 
= metaStateHandle.getTaskLocalSnapshot();
+   IncrementalLocalKeyedStateHandle 
directoryKeyedStateHandle =
+   directoryStateHandle != null && 
taskLocalSnapshotMetaDataStateHandle != null ?
+   new IncrementalLocalKeyedStateHandle(
+   stateBackend.backendUID,
+   checkpointId,
+   directoryStateHandle,
+   stateBackend.keyGroupRange,
+   
taskLocalSnapshotMetaDataStateHandle) :
+   null;
 
-   return new 
SnapshotResult<>(incrementalKeyedStateHandle, null);
+   return new 
SnapshotResult<>(incrementalKeyedStateHandle, directoryKeyedStateHandle);
--- End diff --

Exactly 👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169895663
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1552,10 +1556,33 @@ public IncrementalSnapshotStrategy() {
return DoneFuture.nullValue();
}
 
+   SnapshotDirectory snapshotDirectory;
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169728016
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class represents a directory that is the target for a state 
snapshot. This class provides some method that
+ * simplify resource management when dealing with such directories, e.g. 
it can produce a {@link DirectoryStateHandle}
+ * when the snapshot is completed and disposal considers whether or not a 
snapshot was already completed. For a
+ * completed snapshot, the ownership for cleanup is transferred to the 
created directory state handle. For incomplete
+ * snapshots, calling {@link #deleteIfIncompleteSnapshot()} will delete 
the underlying directory resource.
+ */
+public class SnapshotDirectory {
+
+   /**
+* Lifecycle stages of a snapshot directory.
+*/
+   enum State {
+   ONGOING, COMPLETED, DELETED
+   }
+
+   /** This path describes the underlying directory for the snapshot. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the snapshot directory. */
+   @Nonnull
+   private final FileSystem fileSystem;
+
+   /** This reference tracks the lifecycle state of the snapshot 
directory. */
+   @Nonnull
+   private AtomicReference state;
+
+   public SnapshotDirectory(@Nonnull Path directory, @Nonnull FileSystem 
fileSystem) {
+   this.directory = directory;
+   this.fileSystem = fileSystem;
+   this.state = new AtomicReference<>(State.ONGOING);
+   }
+
+   public SnapshotDirectory(@Nonnull Path directory) throws IOException {
+   this(directory, directory.getFileSystem());
+   }
+
+   @Nonnull
+   public Path getDirectory() {
+   return directory;
+   }
+
+   public boolean mkdirs() throws IOException {
+   return fileSystem.mkdirs(directory);
+   }
+
+   @Nonnull
+   public FileSystem getFileSystem() {
+   return fileSystem;
+   }
+
+   public boolean exists() throws IOException {
+   return fileSystem.exists(directory);
+   }
+
+   /**
+* List the statuses of the files/directories in the snapshot directory.
+*
+* @return the statuses of the files/directories in the given path.
+* @throws IOException if there is a problem creating the file statuses.
+*/
+   public FileStatus[] listStatus() throws IOException {
+   return fileSystem.listStatus(directory);
+   }
+
+   /**
+* Calling this method completes the snapshot into the snapshot 
directory and creates a corresponding
+* {@link DirectoryStateHandle} that points to the snapshot directory. 
Calling this method will also change the
+* lifecycle state from "ongoing" to "completed". If the state was 
already deleted, an {@link IOException} is
+* thrown.
+*
+* @return a directory state handle that points to the snapshot 
directory.
+* @throws IOException if the state of this snapshot directory object 
is different from "ongoing".
+*/
+   public DirectoryStateHandle completeSnapshotAndGetHandle() throws 
IOException {
+   if (state.compareAndSet(State.ONGOING, State.COMPLETED)) {
+   return new DirectoryStateHandle(directory, fileSystem);
+   } else {
+   throw new IOException("Expected state " + State.ONGOING 
+ " but found state " + 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169726507
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/**
+ * This state handle represents a directory. This class is, for example, 
used to represent the directory of RocksDB's
+ * native checkpoint directories for local recovery.
+ */
+public class DirectoryStateHandle implements StateObject {
+
+   /** Serial version. */
+   private static final long serialVersionUID = 1L;
+
+   /** The path that describes the directory. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the directory described by path. */
+   @Nonnull
+   private final FileSystem fileSystem;
--- End diff --

`FileSystem` is not serializable and should be removed from the 
`DirectoryStateHandle`. `directory` should give you all you need.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169727620
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class represents a directory that is the target for a state 
snapshot. This class provides some method that
+ * simplify resource management when dealing with such directories, e.g. 
it can produce a {@link DirectoryStateHandle}
+ * when the snapshot is completed and disposal considers whether or not a 
snapshot was already completed. For a
+ * completed snapshot, the ownership for cleanup is transferred to the 
created directory state handle. For incomplete
+ * snapshots, calling {@link #deleteIfIncompleteSnapshot()} will delete 
the underlying directory resource.
+ */
+public class SnapshotDirectory {
+
+   /**
+* Lifecycle stages of a snapshot directory.
+*/
+   enum State {
+   ONGOING, COMPLETED, DELETED
+   }
+
+   /** This path describes the underlying directory for the snapshot. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the snapshot directory. */
+   @Nonnull
+   private final FileSystem fileSystem;
+
+   /** This reference tracks the lifecycle state of the snapshot 
directory. */
+   @Nonnull
+   private AtomicReference state;
+
+   public SnapshotDirectory(@Nonnull Path directory, @Nonnull FileSystem 
fileSystem) {
+   this.directory = directory;
+   this.fileSystem = fileSystem;
+   this.state = new AtomicReference<>(State.ONGOING);
+   }
+
+   public SnapshotDirectory(@Nonnull Path directory) throws IOException {
+   this(directory, directory.getFileSystem());
+   }
+
+   @Nonnull
+   public Path getDirectory() {
+   return directory;
+   }
+
+   public boolean mkdirs() throws IOException {
+   return fileSystem.mkdirs(directory);
+   }
+
+   @Nonnull
+   public FileSystem getFileSystem() {
+   return fileSystem;
+   }
+
+   public boolean exists() throws IOException {
+   return fileSystem.exists(directory);
+   }
+
+   /**
+* List the statuses of the files/directories in the snapshot directory.
+*
+* @return the statuses of the files/directories in the given path.
+* @throws IOException if there is a problem creating the file statuses.
+*/
+   public FileStatus[] listStatus() throws IOException {
+   return fileSystem.listStatus(directory);
+   }
+
+   /**
+* Calling this method completes the snapshot into the snapshot 
directory and creates a corresponding
+* {@link DirectoryStateHandle} that points to the snapshot directory. 
Calling this method will also change the
+* lifecycle state from "ongoing" to "completed". If the state was 
already deleted, an {@link IOException} is
+* thrown.
+*
+* @return a directory state handle that points to the snapshot 
directory.
+* @throws IOException if the state of this snapshot directory object 
is different from "ongoing".
+*/
+   public DirectoryStateHandle completeSnapshotAndGetHandle() throws 
IOException {
+   if (state.compareAndSet(State.ONGOING, State.COMPLETED)) {
+   return new DirectoryStateHandle(directory, fileSystem);
+   } else {
+   throw new IOException("Expected state " + State.ONGOING 
+ " but found state " + 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169723987
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1552,10 +1556,33 @@ public IncrementalSnapshotStrategy() {
return DoneFuture.nullValue();
}
 
+   SnapshotDirectory snapshotDirectory;
--- End diff --

Could be made `final`


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169725430
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
 ---
@@ -172,4 +167,30 @@ public CheckpointStreamWithResultProvider create(
return new 
CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
}
}
+
+   /**
+* Helper method that takes a {@link SnapshotResult} 
and a {@link KeyGroupRangeOffsets} and
+* creates a {@link SnapshotResult} by combining 
the key groups offsets with all the
+* present stream state handles.
+*/
+   static SnapshotResult 
toKeyedStateHandleSnapshotResult(
+   @Nullable SnapshotResult snapshotResult,
--- End diff --

I think this parameter should not be `Nullable`. If it is null, then we 
don't have to call this method.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169727315
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class represents a directory that is the target for a state 
snapshot. This class provides some method that
+ * simplify resource management when dealing with such directories, e.g. 
it can produce a {@link DirectoryStateHandle}
+ * when the snapshot is completed and disposal considers whether or not a 
snapshot was already completed. For a
+ * completed snapshot, the ownership for cleanup is transferred to the 
created directory state handle. For incomplete
+ * snapshots, calling {@link #deleteIfIncompleteSnapshot()} will delete 
the underlying directory resource.
+ */
+public class SnapshotDirectory {
+
+   /**
+* Lifecycle stages of a snapshot directory.
+*/
+   enum State {
+   ONGOING, COMPLETED, DELETED
+   }
+
+   /** This path describes the underlying directory for the snapshot. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the snapshot directory. */
+   @Nonnull
+   private final FileSystem fileSystem;
--- End diff --

Do we need `FileSystem` if we have `directory` which is of type `Path`?


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169724637
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -2123,9 +2162,21 @@ void takeSnapshot() throws Exception {
checkpointId,
sstFiles,
miscFiles,
-   metaStateHandle);
+   metaStateHandle.getJobManagerOwnedSnapshot());
+
+   DirectoryStateHandle directoryStateHandle = 
localBackupDirectory.completeSnapshotAndGetHandle();
+   StreamStateHandle taskLocalSnapshotMetaDataStateHandle 
= metaStateHandle.getTaskLocalSnapshot();
+   IncrementalLocalKeyedStateHandle 
directoryKeyedStateHandle =
+   directoryStateHandle != null && 
taskLocalSnapshotMetaDataStateHandle != null ?
+   new IncrementalLocalKeyedStateHandle(
+   stateBackend.backendUID,
+   checkpointId,
+   directoryStateHandle,
+   stateBackend.keyGroupRange,
+   
taskLocalSnapshotMetaDataStateHandle) :
+   null;
 
-   return new 
SnapshotResult<>(incrementalKeyedStateHandle, null);
+   return new 
SnapshotResult<>(incrementalKeyedStateHandle, directoryKeyedStateHandle);
--- End diff --

I guess you changed the creation of the `SnapshotResult` in a fixup commit 
in your local branch, right? Otherwise we might refactor this in order to get 
rid of the many `nulls` in the lines above.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169726050
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ * This class is a keyed state handle based on a directory. It combines a 
{@link DirectoryStateHandle} and a
+ * {@link KeyGroupRange}.
+ */
+public class DirectoryKeyedStateHandle implements KeyedStateHandle {
+
+   @Nonnull
+   private final DirectoryStateHandle directoryStateHandle;
+
+   @Nonnull
+   private final KeyGroupRange keyGroupRange;
+
+   public DirectoryKeyedStateHandle(
+   @Nonnull DirectoryStateHandle directoryStateHandle,
+   @Nonnull KeyGroupRange keyGroupRange) {
+
+   this.directoryStateHandle = directoryStateHandle;
+   this.keyGroupRange = keyGroupRange;
+   }
+
+   @Nonnull
+   public DirectoryStateHandle getDirectoryStateHandle() {
+   return directoryStateHandle;
+   }
+
+   @Nonnull
+   @Override
+   public KeyGroupRange getKeyGroupRange() {
+   return keyGroupRange;
+   }
+
+   @Override
+   public void discardState() throws Exception {
+   directoryStateHandle.discardState();
+   }
+
+   @Override
+   public long getStateSize() {
+   return directoryStateHandle.getStateSize();
+   }
+
+   @Override
+   public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+   return 
keyGroupRange.getIntersection(keyGroupRange).getNumberOfKeyGroups() > 0 ? this 
: null;
--- End diff --

This looks like a bug. `keyGroupRange.getIntersection(keyGroupRange)` 
should be `keyGroupRange`. I think in general it is a good idea to not shadow 
local fields by function parameters.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169651017
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
-  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169649287
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169638326
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
-  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-19 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169062887
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
-  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-19 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169062508
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-19 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169061774
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
-  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-19 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169047804
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-19 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169047628
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-19 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169021546
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-17 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168923652
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
-  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-17 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168923464
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
-  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-17 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168923188
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
-  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168891314
  
--- Diff: docs/ops/state/large_state_tuning.md ---
@@ -234,4 +234,97 @@ Compression can be activated through the 
`ExecutionConfig`:
 **Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
+## Task-Local Recovery
+
+### Motivation
+
+In Flink's checkpointing, each task produces a snapshot of its state that 
is then written to a distributed store. Each task acknowledges
+a successful write of the state to the job manager by sending a handle 
that describes the location of the state in the distributed store.
+The job manager, in turn, collects the handles from all tasks and bundles 
them into a checkpoint object.
+
+In case of recovery, the job manager opens the latest checkpoint object 
and sends the handles back to the corresponding tasks, which can
+then restore their state from the distributed storage. Using a distributed 
storage to store state has two important advantages. First, the storage
+is fault tolerant and second, all state in the distributed store is 
accessible to all nodes and can be easily redistributed (e.g. for rescaling).
+
+However, using a remote distributed store has also one big disadvantage: 
all tasks must read their state from a remote location, over the network.
+In many scenarios, recovery could reschedule failed tasks to the same task 
manager as in the previous run (of course there are exceptions like machine
+failures), but we still have to read remote state. This can result in 
*long recovery times for large states*, even if there was only a small failure 
on
+a single machine.
+
+### Approach
+
+Task-local state recovery targets exactly this problem of long recovery 
times and the main idea is the following: for every checkpoint, we do not
+only write task states to the distributed storage, but also keep *a 
secondary copy of the state snapshot in a storage that is local to the task*
+(e.g. on local disk or in memory). Notice that the primary store for 
snapshots must still be the distributed store, because local storage does not
+ensure durability under node failures abd also does not provide access for 
other nodes to redistribute state, this functionality still requires the
+primary copy.
+
+However, for each task that can be rescheduled to the previous location 
for recovery, we can restore state from the secondary, local
+copy and avoid the costs of reading the state remotely. Given that *many 
failures are not node failures and node failures typically only affect one
+or very few nodes at a time*, it is very likely that in a recovery most 
tasks can return to their previous location and find their local state intact.
+This is what makes local recovery effective in reducing recovery time.
+
+Please note that this can come at some additional costs per checkpoint for 
creating and storing the secondary local state copy, depending on the
+chosen state backend and checkpointing strategy. For example, in most 
cases the implementation will simply duplicate the writes to the distributed
+store to a local file.
+
+
+
+### Relationship of primary (distributed store) and secondary (task-local) 
state snapshots
+
+Task-local state is always considered a secondary copy, the ground truth 
of the checkpoint state is the primary copy in the distributed store. This
+has implications for problems with local state during checkpointing and 
recovery:
+
+- For checkpointing, the *primary copy must be successful* and a failure 
to produce the *secondary, local copy will not fail* the checkpoint. A 
checkpoint
+will fail if the primary copy could not be created, even if the secondary 
copy was successfully created.
+
+- Only the primary copy is acknowledged and managed by the job manager, 
secondary copies are owned by task managers and their life cycle can be
+independent from their primary copy. For example, it is possible to retain 
a history of the 3 latest checkpoints as primary copies and only keep
+the task-local state of the latest checkpoint.
+
+- For recovery, Flink will always *attempt to restore from task-local 
state first*, if a matching secondary copy is available. If any problem occurs 
during
+the recovery from the secondary copy, Flink will *transparently retry to 
recovery the task from the primary copy*. Recovery only fails, if primary
+and the (optional) secondary copy failed. In this case, depending on the 
configuration Flink could still fall back to an older checkpoint.
+
+- It is possible that the task-local copy contains only parts of the full 
task state (e.g. exception 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168826365
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,11 +305,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
+   Iterable iterable = () -> new 
RocksIteratorToJavaIteratorAdapter<>(iterator, state, keySerializer, 
keyGroupPrefixBytes);
Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
--- End diff --

After tracing the invoked path, I found this seem to be a new function. And 
there is a  block bug I found that relate to it ... 
[5513](https://github.com/apache/flink/pull/5513)


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168779711
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiFunction;
+
+/**
+ * This class is a wrapper over multiple alternative {@link 
OperatorSubtaskState} that are (partial) substitutes for
+ * each other and imposes a priority ordering over all alternatives for 
the different states which define an order in
+ * which the operator should attempt to restore the state from them. One 
OperatorSubtaskState is considered as the
+ * "ground truth" about which state should be represented. Alternatives 
may be complete or partial substitutes for
+ * the "ground truth" with a higher priority (if they had a lower 
alternative, they would not really be alternatives).
+ * Substitution is determined on a per-sub-state basis.
+ */
+public class PrioritizedOperatorSubtaskState {
+
+   /** Singleton instance for an empty, non-restored operator state. */
+   private static final PrioritizedOperatorSubtaskState 
EMPTY_NON_RESTORED_INSTANCE =
+   new PrioritizedOperatorSubtaskState(new OperatorSubtaskState(), 
Collections.emptyList(), false);
+
+   /** List of prioritized snapshot alternatives for managed operator 
state. */
+   private final List 
prioritizedManagedOperatorState;
+
+   /** List of prioritized snapshot alternatives for raw operator state. */
+   private final List 
prioritizedRawOperatorState;
+
+   /** List of prioritized snapshot alternatives for managed keyed state. 
*/
+   private final List 
prioritizedManagedKeyedState;
+
+   /** List of prioritized snapshot alternatives for raw keyed state. */
+   private final List 
prioritizedRawKeyedState;
+
+   /** Signal flag if this represents state for a restored operator. */
+   private final boolean restored;
+
+   public PrioritizedOperatorSubtaskState(
+   @Nonnull OperatorSubtaskState jobManagerState,
+   @Nonnull List alternativesByPriority) {
+   this(jobManagerState, alternativesByPriority, true);
+   }
+
+   public PrioritizedOperatorSubtaskState(
+   @Nonnull OperatorSubtaskState jobManagerState,
+   @Nonnull List alternativesByPriority,
+   boolean restored) {
+
+   Preconditions.checkNotNull(jobManagerState, "Job manager state 
is null.");
+   int size = Preconditions.checkNotNull(alternativesByPriority, 
"Alternative states are null.").size();
+
+   this.restored = restored;
+
+   List 
managedOperatorAlternatives = new ArrayList<>(size);
+   List 
managedKeyedAlternatives = new ArrayList<>(size);
+   List 
rawOperatorAlternatives = new ArrayList<>(size);
+   List 
rawKeyedAlternatives = new ArrayList<>(size);
+
+   for (OperatorSubtaskState subtaskState : 
alternativesByPriority) {
+
+   if (subtaskState != null) {
+   
managedKeyedAlternatives.add(subtaskState.getManagedKeyedState());
+   
rawKeyedAlternatives.add(subtaskState.getRawKeyedState());
+   

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168779365
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiFunction;
+
+/**
+ * This class is a wrapper over multiple alternative {@link 
OperatorSubtaskState} that are (partial) substitutes for
+ * each other and imposes a priority ordering over all alternatives for 
the different states which define an order in
+ * which the operator should attempt to restore the state from them. One 
OperatorSubtaskState is considered as the
+ * "ground truth" about which state should be represented. Alternatives 
may be complete or partial substitutes for
+ * the "ground truth" with a higher priority (if they had a lower 
alternative, they would not really be alternatives).
+ * Substitution is determined on a per-sub-state basis.
+ */
+public class PrioritizedOperatorSubtaskState {
+
+   /** Singleton instance for an empty, non-restored operator state. */
+   private static final PrioritizedOperatorSubtaskState 
EMPTY_NON_RESTORED_INSTANCE =
+   new PrioritizedOperatorSubtaskState(new OperatorSubtaskState(), 
Collections.emptyList(), false);
+
+   /** List of prioritized snapshot alternatives for managed operator 
state. */
+   private final List 
prioritizedManagedOperatorState;
+
+   /** List of prioritized snapshot alternatives for raw operator state. */
+   private final List 
prioritizedRawOperatorState;
+
+   /** List of prioritized snapshot alternatives for managed keyed state. 
*/
+   private final List 
prioritizedManagedKeyedState;
+
+   /** List of prioritized snapshot alternatives for raw keyed state. */
+   private final List 
prioritizedRawKeyedState;
+
+   /** Signal flag if this represents state for a restored operator. */
+   private final boolean restored;
+
+   public PrioritizedOperatorSubtaskState(
+   @Nonnull OperatorSubtaskState jobManagerState,
+   @Nonnull List alternativesByPriority) {
+   this(jobManagerState, alternativesByPriority, true);
+   }
+
+   public PrioritizedOperatorSubtaskState(
+   @Nonnull OperatorSubtaskState jobManagerState,
+   @Nonnull List alternativesByPriority,
+   boolean restored) {
+
+   Preconditions.checkNotNull(jobManagerState, "Job manager state 
is null.");
+   int size = Preconditions.checkNotNull(alternativesByPriority, 
"Alternative states are null.").size();
+
+   this.restored = restored;
+
+   List 
managedOperatorAlternatives = new ArrayList<>(size);
+   List 
managedKeyedAlternatives = new ArrayList<>(size);
+   List 
rawOperatorAlternatives = new ArrayList<>(size);
+   List 
rawKeyedAlternatives = new ArrayList<>(size);
+
+   for (OperatorSubtaskState subtaskState : 
alternativesByPriority) {
+
+   if (subtaskState != null) {
+   
managedKeyedAlternatives.add(subtaskState.getManagedKeyedState());
+   
rawKeyedAlternatives.add(subtaskState.getRawKeyedState());
+  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168778447
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiFunction;
+
+/**
+ * This class is a wrapper over multiple alternative {@link 
OperatorSubtaskState} that are (partial) substitutes for
+ * each other and imposes a priority ordering over all alternatives for 
the different states which define an order in
+ * which the operator should attempt to restore the state from them. One 
OperatorSubtaskState is considered as the
+ * "ground truth" about which state should be represented. Alternatives 
may be complete or partial substitutes for
+ * the "ground truth" with a higher priority (if they had a lower 
alternative, they would not really be alternatives).
+ * Substitution is determined on a per-sub-state basis.
+ */
+public class PrioritizedOperatorSubtaskState {
+
+   /** Singleton instance for an empty, non-restored operator state. */
+   private static final PrioritizedOperatorSubtaskState 
EMPTY_NON_RESTORED_INSTANCE =
+   new PrioritizedOperatorSubtaskState(new OperatorSubtaskState(), 
Collections.emptyList(), false);
+
+   /** List of prioritized snapshot alternatives for managed operator 
state. */
+   private final List 
prioritizedManagedOperatorState;
+
+   /** List of prioritized snapshot alternatives for raw operator state. */
+   private final List 
prioritizedRawOperatorState;
+
+   /** List of prioritized snapshot alternatives for managed keyed state. 
*/
+   private final List 
prioritizedManagedKeyedState;
+
+   /** List of prioritized snapshot alternatives for raw keyed state. */
+   private final List 
prioritizedRawKeyedState;
+
+   /** Signal flag if this represents state for a restored operator. */
+   private final boolean restored;
+
+   public PrioritizedOperatorSubtaskState(
+   @Nonnull OperatorSubtaskState jobManagerState,
+   @Nonnull List alternativesByPriority) {
+   this(jobManagerState, alternativesByPriority, true);
+   }
+
+   public PrioritizedOperatorSubtaskState(
+   @Nonnull OperatorSubtaskState jobManagerState,
+   @Nonnull List alternativesByPriority,
+   boolean restored) {
+
+   Preconditions.checkNotNull(jobManagerState, "Job manager state 
is null.");
+   int size = Preconditions.checkNotNull(alternativesByPriority, 
"Alternative states are null.").size();
+
+   this.restored = restored;
+
+   List 
managedOperatorAlternatives = new ArrayList<>(size);
+   List 
managedKeyedAlternatives = new ArrayList<>(size);
+   List 
rawOperatorAlternatives = new ArrayList<>(size);
+   List 
rawKeyedAlternatives = new ArrayList<>(size);
+
+   for (OperatorSubtaskState subtaskState : 
alternativesByPriority) {
+
+   if (subtaskState != null) {
+   
managedKeyedAlternatives.add(subtaskState.getManagedKeyedState());
+   
rawKeyedAlternatives.add(subtaskState.getRawKeyedState());
+   

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168778376
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,11 +305,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
+   Iterable iterable = () -> new 
RocksIteratorToJavaIteratorAdapter<>(iterator, state, keySerializer, 
keyGroupPrefixBytes);
Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
--- End diff --

Well, maybe, except if this is also only used with void-namespace.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168778178
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,11 +305,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
+   Iterable iterable = () -> new 
RocksIteratorToJavaIteratorAdapter<>(iterator, state, keySerializer, 
keyGroupPrefixBytes);
Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r16849
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiFunction;
+
+/**
+ * This class is a wrapper over multiple alternative {@link 
OperatorSubtaskState} that are (partial) substitutes for
+ * each other and imposes a priority ordering over all alternatives for 
the different states which define an order in
+ * which the operator should attempt to restore the state from them. One 
OperatorSubtaskState is considered as the
+ * "ground truth" about which state should be represented. Alternatives 
may be complete or partial substitutes for
+ * the "ground truth" with a higher priority (if they had a lower 
alternative, they would not really be alternatives).
+ * Substitution is determined on a per-sub-state basis.
+ */
+public class PrioritizedOperatorSubtaskState {
+
+   /** Singleton instance for an empty, non-restored operator state. */
+   private static final PrioritizedOperatorSubtaskState 
EMPTY_NON_RESTORED_INSTANCE =
+   new PrioritizedOperatorSubtaskState(new OperatorSubtaskState(), 
Collections.emptyList(), false);
+
+   /** List of prioritized snapshot alternatives for managed operator 
state. */
+   private final List 
prioritizedManagedOperatorState;
+
+   /** List of prioritized snapshot alternatives for raw operator state. */
+   private final List 
prioritizedRawOperatorState;
+
+   /** List of prioritized snapshot alternatives for managed keyed state. 
*/
+   private final List 
prioritizedManagedKeyedState;
+
+   /** List of prioritized snapshot alternatives for raw keyed state. */
+   private final List 
prioritizedRawKeyedState;
+
+   /** Signal flag if this represents state for a restored operator. */
+   private final boolean restored;
+
+   public PrioritizedOperatorSubtaskState(
+   @Nonnull OperatorSubtaskState jobManagerState,
+   @Nonnull List alternativesByPriority) {
+   this(jobManagerState, alternativesByPriority, true);
+   }
+
+   public PrioritizedOperatorSubtaskState(
+   @Nonnull OperatorSubtaskState jobManagerState,
+   @Nonnull List alternativesByPriority,
+   boolean restored) {
+
+   Preconditions.checkNotNull(jobManagerState, "Job manager state 
is null.");
+   int size = Preconditions.checkNotNull(alternativesByPriority, 
"Alternative states are null.").size();
+
+   this.restored = restored;
+
+   List 
managedOperatorAlternatives = new ArrayList<>(size);
+   List 
managedKeyedAlternatives = new ArrayList<>(size);
+   List 
rawOperatorAlternatives = new ArrayList<>(size);
+   List 
rawKeyedAlternatives = new ArrayList<>(size);
+
+   for (OperatorSubtaskState subtaskState : 
alternativesByPriority) {
+
+   if (subtaskState != null) {
+   
managedKeyedAlternatives.add(subtaskState.getManagedKeyedState());
+   
rawKeyedAlternatives.add(subtaskState.getRawKeyedState());
+  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r16837
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,11 +305,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
+   Iterable iterable = () -> new 
RocksIteratorToJavaIteratorAdapter<>(iterator, state, keySerializer, 
keyGroupPrefixBytes);
Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
--- End diff --

Checked. In `HeapKeyedStateBackend` the implementation is 
```
@Override
public  Stream getKeys(String state, N namespace) {
if (!stateTables.containsKey(state)) {
return Stream.empty();
}
StateTable table = (StateTable) 
stateTables.get(state);
return table.getKeys(namespace);
}
```
So I think `namespace` is useful ...


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168777497
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiFunction;
+
+/**
+ * This class is a wrapper over multiple alternative {@link 
OperatorSubtaskState} that are (partial) substitutes for
+ * each other and imposes a priority ordering over all alternatives for 
the different states which define an order in
+ * which the operator should attempt to restore the state from them. One 
OperatorSubtaskState is considered as the
+ * "ground truth" about which state should be represented. Alternatives 
may be complete or partial substitutes for
+ * the "ground truth" with a higher priority (if they had a lower 
alternative, they would not really be alternatives).
+ * Substitution is determined on a per-sub-state basis.
+ */
+public class PrioritizedOperatorSubtaskState {
+
+   /** Singleton instance for an empty, non-restored operator state. */
+   private static final PrioritizedOperatorSubtaskState 
EMPTY_NON_RESTORED_INSTANCE =
+   new PrioritizedOperatorSubtaskState(new OperatorSubtaskState(), 
Collections.emptyList(), false);
+
+   /** List of prioritized snapshot alternatives for managed operator 
state. */
+   private final List 
prioritizedManagedOperatorState;
+
+   /** List of prioritized snapshot alternatives for raw operator state. */
+   private final List 
prioritizedRawOperatorState;
+
+   /** List of prioritized snapshot alternatives for managed keyed state. 
*/
+   private final List 
prioritizedManagedKeyedState;
+
+   /** List of prioritized snapshot alternatives for raw keyed state. */
+   private final List 
prioritizedRawKeyedState;
+
+   /** Signal flag if this represents state for a restored operator. */
+   private final boolean restored;
+
+   public PrioritizedOperatorSubtaskState(
+   @Nonnull OperatorSubtaskState jobManagerState,
+   @Nonnull List alternativesByPriority) {
+   this(jobManagerState, alternativesByPriority, true);
+   }
+
+   public PrioritizedOperatorSubtaskState(
+   @Nonnull OperatorSubtaskState jobManagerState,
+   @Nonnull List alternativesByPriority,
+   boolean restored) {
+
+   Preconditions.checkNotNull(jobManagerState, "Job manager state 
is null.");
+   int size = Preconditions.checkNotNull(alternativesByPriority, 
"Alternative states are null.").size();
+
+   this.restored = restored;
+
+   List 
managedOperatorAlternatives = new ArrayList<>(size);
+   List 
managedKeyedAlternatives = new ArrayList<>(size);
+   List 
rawOperatorAlternatives = new ArrayList<>(size);
+   List 
rawKeyedAlternatives = new ArrayList<>(size);
+
+   for (OperatorSubtaskState subtaskState : 
alternativesByPriority) {
+
+   if (subtaskState != null) {
+   
managedKeyedAlternatives.add(subtaskState.getManagedKeyedState());
+   
rawKeyedAlternatives.add(subtaskState.getRawKeyedState());
+  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168776176
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,11 +305,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
+   Iterable iterable = () -> new 
RocksIteratorToJavaIteratorAdapter<>(iterator, state, keySerializer, 
keyGroupPrefixBytes);
Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
--- End diff --

Ok, I will check whether the namespace parameter is actually required by 
check the other state backend (HeapKeyedStateBackend), I think their 
implementation should be consistent.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168775173
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiFunction;
+
+/**
+ * This class is a wrapper over multiple alternative {@link 
OperatorSubtaskState} that are (partial) substitutes for
+ * each other and imposes a priority ordering over all alternatives for 
the different states which define an order in
+ * which the operator should attempt to restore the state from them. One 
OperatorSubtaskState is considered as the
+ * "ground truth" about which state should be represented. Alternatives 
may be complete or partial substitutes for
+ * the "ground truth" with a higher priority (if they had a lower 
alternative, they would not really be alternatives).
+ * Substitution is determined on a per-sub-state basis.
+ */
+public class PrioritizedOperatorSubtaskState {
+
+   /** Singleton instance for an empty, non-restored operator state. */
+   private static final PrioritizedOperatorSubtaskState 
EMPTY_NON_RESTORED_INSTANCE =
+   new PrioritizedOperatorSubtaskState(new OperatorSubtaskState(), 
Collections.emptyList(), false);
+
+   /** List of prioritized snapshot alternatives for managed operator 
state. */
+   private final List 
prioritizedManagedOperatorState;
+
+   /** List of prioritized snapshot alternatives for raw operator state. */
+   private final List 
prioritizedRawOperatorState;
+
+   /** List of prioritized snapshot alternatives for managed keyed state. 
*/
+   private final List 
prioritizedManagedKeyedState;
+
+   /** List of prioritized snapshot alternatives for raw keyed state. */
+   private final List 
prioritizedRawKeyedState;
+
+   /** Signal flag if this represents state for a restored operator. */
+   private final boolean restored;
+
+   public PrioritizedOperatorSubtaskState(
+   @Nonnull OperatorSubtaskState jobManagerState,
+   @Nonnull List alternativesByPriority) {
+   this(jobManagerState, alternativesByPriority, true);
+   }
+
+   public PrioritizedOperatorSubtaskState(
+   @Nonnull OperatorSubtaskState jobManagerState,
+   @Nonnull List alternativesByPriority,
+   boolean restored) {
+
+   Preconditions.checkNotNull(jobManagerState, "Job manager state 
is null.");
+   int size = Preconditions.checkNotNull(alternativesByPriority, 
"Alternative states are null.").size();
+
+   this.restored = restored;
+
+   List 
managedOperatorAlternatives = new ArrayList<>(size);
+   List 
managedKeyedAlternatives = new ArrayList<>(size);
+   List 
rawOperatorAlternatives = new ArrayList<>(size);
+   List 
rawKeyedAlternatives = new ArrayList<>(size);
+
+   for (OperatorSubtaskState subtaskState : 
alternativesByPriority) {
+
+   if (subtaskState != null) {
+   
managedKeyedAlternatives.add(subtaskState.getManagedKeyedState());
+   
rawKeyedAlternatives.add(subtaskState.getRawKeyedState());
+   

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168774627
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiFunction;
+
+/**
+ * This class is a wrapper over multiple alternative {@link 
OperatorSubtaskState} that are (partial) substitutes for
+ * each other and imposes a priority ordering over all alternatives for 
the different states which define an order in
+ * which the operator should attempt to restore the state from them. One 
OperatorSubtaskState is considered as the
+ * "ground truth" about which state should be represented. Alternatives 
may be complete or partial substitutes for
+ * the "ground truth" with a higher priority (if they had a lower 
alternative, they would not really be alternatives).
+ * Substitution is determined on a per-sub-state basis.
+ */
+public class PrioritizedOperatorSubtaskState {
+
+   /** Singleton instance for an empty, non-restored operator state. */
+   private static final PrioritizedOperatorSubtaskState 
EMPTY_NON_RESTORED_INSTANCE =
+   new PrioritizedOperatorSubtaskState(new OperatorSubtaskState(), 
Collections.emptyList(), false);
+
+   /** List of prioritized snapshot alternatives for managed operator 
state. */
+   private final List 
prioritizedManagedOperatorState;
+
+   /** List of prioritized snapshot alternatives for raw operator state. */
+   private final List 
prioritizedRawOperatorState;
+
+   /** List of prioritized snapshot alternatives for managed keyed state. 
*/
+   private final List 
prioritizedManagedKeyedState;
+
+   /** List of prioritized snapshot alternatives for raw keyed state. */
+   private final List 
prioritizedRawKeyedState;
+
+   /** Signal flag if this represents state for a restored operator. */
+   private final boolean restored;
+
+   public PrioritizedOperatorSubtaskState(
+   @Nonnull OperatorSubtaskState jobManagerState,
+   @Nonnull List alternativesByPriority) {
+   this(jobManagerState, alternativesByPriority, true);
+   }
+
+   public PrioritizedOperatorSubtaskState(
+   @Nonnull OperatorSubtaskState jobManagerState,
+   @Nonnull List alternativesByPriority,
+   boolean restored) {
+
+   Preconditions.checkNotNull(jobManagerState, "Job manager state 
is null.");
+   int size = Preconditions.checkNotNull(alternativesByPriority, 
"Alternative states are null.").size();
+
+   this.restored = restored;
+
+   List 
managedOperatorAlternatives = new ArrayList<>(size);
+   List 
managedKeyedAlternatives = new ArrayList<>(size);
+   List 
rawOperatorAlternatives = new ArrayList<>(size);
+   List 
rawKeyedAlternatives = new ArrayList<>(size);
+
+   for (OperatorSubtaskState subtaskState : 
alternativesByPriority) {
+
+   if (subtaskState != null) {
+   
managedKeyedAlternatives.add(subtaskState.getManagedKeyedState());
+   
rawKeyedAlternatives.add(subtaskState.getRawKeyedState());
+   

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168773814
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,11 +305,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
+   Iterable iterable = () -> new 
RocksIteratorToJavaIteratorAdapter<>(iterator, state, keySerializer, 
keyGroupPrefixBytes);
Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
--- End diff --

Fine with me, but since this seems to have not caused any trouble so far, 
you might first want to check if the namespace parameter is actually required. 
Maybe it can just be removed as parameter because the using code wants to 
iterate all keys in all namespace.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168773128
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,11 +305,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
+   Iterable iterable = () -> new 
RocksIteratorToJavaIteratorAdapter<>(iterator, state, keySerializer, 
keyGroupPrefixBytes);
Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
--- End diff --

Aha, I would like to open a issue with a PR for this if you don't mind?


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168762457
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,11 +305,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
+   Iterable iterable = () -> new 
RocksIteratorToJavaIteratorAdapter<>(iterator, state, keySerializer, 
keyGroupPrefixBytes);
Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
--- End diff --

Yes, that seems a little odd, but I suggest to make this a separate issue 
because this PR has not introduced or changed this, except for that the class 
was renamed.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168760098
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,11 +305,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
+   Iterable iterable = () -> new 
RocksIteratorToJavaIteratorAdapter<>(iterator, state, keySerializer, 
keyGroupPrefixBytes);
Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
--- End diff --

This method seems to be problematic, it doesn't use the `namespace` to 
filter data ...


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168753623
  
--- Diff: docs/ops/state/large_state_tuning.md ---
@@ -234,4 +234,97 @@ Compression can be activated through the 
`ExecutionConfig`:
 **Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
+## Task-Local Recovery
+
+### Motivation
+
+In Flink's checkpointing, each task produces a snapshot of its state that 
is then written to a distributed store. Each task acknowledges
+a successful write of the state to the job manager by sending a handle 
that describes the location of the state in the distributed store.
+The job manager, in turn, collects the handles from all tasks and bundles 
them into a checkpoint object.
+
+In case of recovery, the job manager opens the latest checkpoint object 
and sends the handles back to the corresponding tasks, which can
+then restore their state from the distributed storage. Using a distributed 
storage to store state has two important advantages. First, the storage
+is fault tolerant and second, all state in the distributed store is 
accessible to all nodes and can be easily redistributed (e.g. for rescaling).
+
+However, using a remote distributed store has also one big disadvantage: 
all tasks must read their state from a remote location, over the network.
+In many scenarios, recovery could reschedule failed tasks to the same task 
manager as in the previous run (of course there are exceptions like machine
+failures), but we still have to read remote state. This can result in 
*long recovery times for large states*, even if there was only a small failure 
on
+a single machine.
+
+### Approach
+
+Task-local state recovery targets exactly this problem of long recovery 
times and the main idea is the following: for every checkpoint, we do not
+only write task states to the distributed storage, but also keep *a 
secondary copy of the state snapshot in a storage that is local to the task*
+(e.g. on local disk or in memory). Notice that the primary store for 
snapshots must still be the distributed store, because local storage does not
+ensure durability under node failures abd also does not provide access for 
other nodes to redistribute state, this functionality still requires the
+primary copy.
+
+However, for each task that can be rescheduled to the previous location 
for recovery, we can restore state from the secondary, local
+copy and avoid the costs of reading the state remotely. Given that *many 
failures are not node failures and node failures typically only affect one
+or very few nodes at a time*, it is very likely that in a recovery most 
tasks can return to their previous location and find their local state intact.
+This is what makes local recovery effective in reducing recovery time.
+
+Please note that this can come at some additional costs per checkpoint for 
creating and storing the secondary local state copy, depending on the
+chosen state backend and checkpointing strategy. For example, in most 
cases the implementation will simply duplicate the writes to the distributed
+store to a local file.
+
+
+
+### Relationship of primary (distributed store) and secondary (task-local) 
state snapshots
+
+Task-local state is always considered a secondary copy, the ground truth 
of the checkpoint state is the primary copy in the distributed store. This
+has implications for problems with local state during checkpointing and 
recovery:
+
+- For checkpointing, the *primary copy must be successful* and a failure 
to produce the *secondary, local copy will not fail* the checkpoint. A 
checkpoint
+will fail if the primary copy could not be created, even if the secondary 
copy was successfully created.
+
+- Only the primary copy is acknowledged and managed by the job manager, 
secondary copies are owned by task managers and their life cycle can be
+independent from their primary copy. For example, it is possible to retain 
a history of the 3 latest checkpoints as primary copies and only keep
+the task-local state of the latest checkpoint.
+
+- For recovery, Flink will always *attempt to restore from task-local 
state first*, if a matching secondary copy is available. If any problem occurs 
during
+the recovery from the secondary copy, Flink will *transparently retry to 
recovery the task from the primary copy*. Recovery only fails, if primary
+and the (optional) secondary copy failed. In this case, depending on the 
configuration Flink could still fall back to an older checkpoint.
+
+- It is possible that the task-local copy contains only parts of the full 
task state (e.g. exception 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168753503
  
--- Diff: docs/ops/state/large_state_tuning.md ---
@@ -234,4 +234,97 @@ Compression can be activated through the 
`ExecutionConfig`:
 **Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
+## Task-Local Recovery
+
+### Motivation
+
+In Flink's checkpointing, each task produces a snapshot of its state that 
is then written to a distributed store. Each task acknowledges
+a successful write of the state to the job manager by sending a handle 
that describes the location of the state in the distributed store.
+The job manager, in turn, collects the handles from all tasks and bundles 
them into a checkpoint object.
+
+In case of recovery, the job manager opens the latest checkpoint object 
and sends the handles back to the corresponding tasks, which can
+then restore their state from the distributed storage. Using a distributed 
storage to store state has two important advantages. First, the storage
+is fault tolerant and second, all state in the distributed store is 
accessible to all nodes and can be easily redistributed (e.g. for rescaling).
+
+However, using a remote distributed store has also one big disadvantage: 
all tasks must read their state from a remote location, over the network.
+In many scenarios, recovery could reschedule failed tasks to the same task 
manager as in the previous run (of course there are exceptions like machine
+failures), but we still have to read remote state. This can result in 
*long recovery times for large states*, even if there was only a small failure 
on
+a single machine.
+
+### Approach
+
+Task-local state recovery targets exactly this problem of long recovery 
times and the main idea is the following: for every checkpoint, we do not
+only write task states to the distributed storage, but also keep *a 
secondary copy of the state snapshot in a storage that is local to the task*
+(e.g. on local disk or in memory). Notice that the primary store for 
snapshots must still be the distributed store, because local storage does not
+ensure durability under node failures abd also does not provide access for 
other nodes to redistribute state, this functionality still requires the
+primary copy.
+
+However, for each task that can be rescheduled to the previous location 
for recovery, we can restore state from the secondary, local
+copy and avoid the costs of reading the state remotely. Given that *many 
failures are not node failures and node failures typically only affect one
+or very few nodes at a time*, it is very likely that in a recovery most 
tasks can return to their previous location and find their local state intact.
+This is what makes local recovery effective in reducing recovery time.
+
+Please note that this can come at some additional costs per checkpoint for 
creating and storing the secondary local state copy, depending on the
+chosen state backend and checkpointing strategy. For example, in most 
cases the implementation will simply duplicate the writes to the distributed
+store to a local file.
+
+
+
+### Relationship of primary (distributed store) and secondary (task-local) 
state snapshots
+
+Task-local state is always considered a secondary copy, the ground truth 
of the checkpoint state is the primary copy in the distributed store. This
+has implications for problems with local state during checkpointing and 
recovery:
+
+- For checkpointing, the *primary copy must be successful* and a failure 
to produce the *secondary, local copy will not fail* the checkpoint. A 
checkpoint
+will fail if the primary copy could not be created, even if the secondary 
copy was successfully created.
+
+- Only the primary copy is acknowledged and managed by the job manager, 
secondary copies are owned by task managers and their life cycle can be
+independent from their primary copy. For example, it is possible to retain 
a history of the 3 latest checkpoints as primary copies and only keep
+the task-local state of the latest checkpoint.
+
+- For recovery, Flink will always *attempt to restore from task-local 
state first*, if a matching secondary copy is available. If any problem occurs 
during
+the recovery from the secondary copy, Flink will *transparently retry to 
recovery the task from the primary copy*. Recovery only fails, if primary
+and the (optional) secondary copy failed. In this case, depending on the 
configuration Flink could still fall back to an older checkpoint.
+
+- It is possible that the task-local copy contains only parts of the full 
task state (e.g. exception 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168753488
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
-  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168753362
  
--- Diff: docs/ops/state/large_state_tuning.md ---
@@ -234,4 +234,97 @@ Compression can be activated through the 
`ExecutionConfig`:
 **Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
+## Task-Local Recovery
+
+### Motivation
+
+In Flink's checkpointing, each task produces a snapshot of its state that 
is then written to a distributed store. Each task acknowledges
+a successful write of the state to the job manager by sending a handle 
that describes the location of the state in the distributed store.
+The job manager, in turn, collects the handles from all tasks and bundles 
them into a checkpoint object.
+
+In case of recovery, the job manager opens the latest checkpoint object 
and sends the handles back to the corresponding tasks, which can
+then restore their state from the distributed storage. Using a distributed 
storage to store state has two important advantages. First, the storage
+is fault tolerant and second, all state in the distributed store is 
accessible to all nodes and can be easily redistributed (e.g. for rescaling).
+
+However, using a remote distributed store has also one big disadvantage: 
all tasks must read their state from a remote location, over the network.
+In many scenarios, recovery could reschedule failed tasks to the same task 
manager as in the previous run (of course there are exceptions like machine
+failures), but we still have to read remote state. This can result in 
*long recovery times for large states*, even if there was only a small failure 
on
+a single machine.
+
+### Approach
+
+Task-local state recovery targets exactly this problem of long recovery 
times and the main idea is the following: for every checkpoint, we do not
+only write task states to the distributed storage, but also keep *a 
secondary copy of the state snapshot in a storage that is local to the task*
+(e.g. on local disk or in memory). Notice that the primary store for 
snapshots must still be the distributed store, because local storage does not
+ensure durability under node failures abd also does not provide access for 
other nodes to redistribute state, this functionality still requires the
+primary copy.
+
+However, for each task that can be rescheduled to the previous location 
for recovery, we can restore state from the secondary, local
+copy and avoid the costs of reading the state remotely. Given that *many 
failures are not node failures and node failures typically only affect one
+or very few nodes at a time*, it is very likely that in a recovery most 
tasks can return to their previous location and find their local state intact.
+This is what makes local recovery effective in reducing recovery time.
+
+Please note that this can come at some additional costs per checkpoint for 
creating and storing the secondary local state copy, depending on the
+chosen state backend and checkpointing strategy. For example, in most 
cases the implementation will simply duplicate the writes to the distributed
+store to a local file.
+
+
+
+### Relationship of primary (distributed store) and secondary (task-local) 
state snapshots
+
+Task-local state is always considered a secondary copy, the ground truth 
of the checkpoint state is the primary copy in the distributed store. This
+has implications for problems with local state during checkpointing and 
recovery:
+
+- For checkpointing, the *primary copy must be successful* and a failure 
to produce the *secondary, local copy will not fail* the checkpoint. A 
checkpoint
+will fail if the primary copy could not be created, even if the secondary 
copy was successfully created.
+
+- Only the primary copy is acknowledged and managed by the job manager, 
secondary copies are owned by task managers and their life cycle can be
+independent from their primary copy. For example, it is possible to retain 
a history of the 3 latest checkpoints as primary copies and only keep
+the task-local state of the latest checkpoint.
+
+- For recovery, Flink will always *attempt to restore from task-local 
state first*, if a matching secondary copy is available. If any problem occurs 
during
+the recovery from the secondary copy, Flink will *transparently retry to 
recovery the task from the primary copy*. Recovery only fails, if primary
+and the (optional) secondary copy failed. In this case, depending on the 
configuration Flink could still fall back to an older checkpoint.
+
+- It is possible that the task-local copy contains only parts of the full 
task state (e.g. exception 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168753314
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
-  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168753015
  
--- Diff: docs/ops/state/large_state_tuning.md ---
@@ -234,4 +234,97 @@ Compression can be activated through the 
`ExecutionConfig`:
 **Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
+## Task-Local Recovery
+
+### Motivation
+
+In Flink's checkpointing, each task produces a snapshot of its state that 
is then written to a distributed store. Each task acknowledges
+a successful write of the state to the job manager by sending a handle 
that describes the location of the state in the distributed store.
+The job manager, in turn, collects the handles from all tasks and bundles 
them into a checkpoint object.
+
+In case of recovery, the job manager opens the latest checkpoint object 
and sends the handles back to the corresponding tasks, which can
+then restore their state from the distributed storage. Using a distributed 
storage to store state has two important advantages. First, the storage
+is fault tolerant and second, all state in the distributed store is 
accessible to all nodes and can be easily redistributed (e.g. for rescaling).
+
+However, using a remote distributed store has also one big disadvantage: 
all tasks must read their state from a remote location, over the network.
+In many scenarios, recovery could reschedule failed tasks to the same task 
manager as in the previous run (of course there are exceptions like machine
+failures), but we still have to read remote state. This can result in 
*long recovery times for large states*, even if there was only a small failure 
on
+a single machine.
+
+### Approach
+
+Task-local state recovery targets exactly this problem of long recovery 
times and the main idea is the following: for every checkpoint, we do not
+only write task states to the distributed storage, but also keep *a 
secondary copy of the state snapshot in a storage that is local to the task*
+(e.g. on local disk or in memory). Notice that the primary store for 
snapshots must still be the distributed store, because local storage does not
+ensure durability under node failures abd also does not provide access for 
other nodes to redistribute state, this functionality still requires the
+primary copy.
+
+However, for each task that can be rescheduled to the previous location 
for recovery, we can restore state from the secondary, local
+copy and avoid the costs of reading the state remotely. Given that *many 
failures are not node failures and node failures typically only affect one
+or very few nodes at a time*, it is very likely that in a recovery most 
tasks can return to their previous location and find their local state intact.
+This is what makes local recovery effective in reducing recovery time.
+
+Please note that this can come at some additional costs per checkpoint for 
creating and storing the secondary local state copy, depending on the
+chosen state backend and checkpointing strategy. For example, in most 
cases the implementation will simply duplicate the writes to the distributed
+store to a local file.
+
+
+
+### Relationship of primary (distributed store) and secondary (task-local) 
state snapshots
+
+Task-local state is always considered a secondary copy, the ground truth 
of the checkpoint state is the primary copy in the distributed store. This
+has implications for problems with local state during checkpointing and 
recovery:
+
+- For checkpointing, the *primary copy must be successful* and a failure 
to produce the *secondary, local copy will not fail* the checkpoint. A 
checkpoint
+will fail if the primary copy could not be created, even if the secondary 
copy was successfully created.
+
+- Only the primary copy is acknowledged and managed by the job manager, 
secondary copies are owned by task managers and their life cycle can be
+independent from their primary copy. For example, it is possible to retain 
a history of the 3 latest checkpoints as primary copies and only keep
+the task-local state of the latest checkpoint.
+
+- For recovery, Flink will always *attempt to restore from task-local 
state first*, if a matching secondary copy is available. If any problem occurs 
during
+the recovery from the secondary copy, Flink will *transparently retry to 
recovery the task from the primary copy*. Recovery only fails, if primary
+and the (optional) secondary copy failed. In this case, depending on the 
configuration Flink could still fall back to an older checkpoint.
+
+- It is possible that the task-local copy contains only parts of the full 
task state (e.g. exception 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168752606
  
--- Diff: docs/ops/state/large_state_tuning.md ---
@@ -234,4 +234,97 @@ Compression can be activated through the 
`ExecutionConfig`:
 **Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
+## Task-Local Recovery
+
+### Motivation
+
+In Flink's checkpointing, each task produces a snapshot of its state that 
is then written to a distributed store. Each task acknowledges
+a successful write of the state to the job manager by sending a handle 
that describes the location of the state in the distributed store.
+The job manager, in turn, collects the handles from all tasks and bundles 
them into a checkpoint object.
+
+In case of recovery, the job manager opens the latest checkpoint object 
and sends the handles back to the corresponding tasks, which can
+then restore their state from the distributed storage. Using a distributed 
storage to store state has two important advantages. First, the storage
+is fault tolerant and second, all state in the distributed store is 
accessible to all nodes and can be easily redistributed (e.g. for rescaling).
+
+However, using a remote distributed store has also one big disadvantage: 
all tasks must read their state from a remote location, over the network.
+In many scenarios, recovery could reschedule failed tasks to the same task 
manager as in the previous run (of course there are exceptions like machine
+failures), but we still have to read remote state. This can result in 
*long recovery times for large states*, even if there was only a small failure 
on
+a single machine.
+
+### Approach
+
+Task-local state recovery targets exactly this problem of long recovery 
times and the main idea is the following: for every checkpoint, we do not
+only write task states to the distributed storage, but also keep *a 
secondary copy of the state snapshot in a storage that is local to the task*
+(e.g. on local disk or in memory). Notice that the primary store for 
snapshots must still be the distributed store, because local storage does not
+ensure durability under node failures abd also does not provide access for 
other nodes to redistribute state, this functionality still requires the
+primary copy.
+
+However, for each task that can be rescheduled to the previous location 
for recovery, we can restore state from the secondary, local
+copy and avoid the costs of reading the state remotely. Given that *many 
failures are not node failures and node failures typically only affect one
+or very few nodes at a time*, it is very likely that in a recovery most 
tasks can return to their previous location and find their local state intact.
+This is what makes local recovery effective in reducing recovery time.
+
+Please note that this can come at some additional costs per checkpoint for 
creating and storing the secondary local state copy, depending on the
+chosen state backend and checkpointing strategy. For example, in most 
cases the implementation will simply duplicate the writes to the distributed
+store to a local file.
+
+
+
+### Relationship of primary (distributed store) and secondary (task-local) 
state snapshots
+
+Task-local state is always considered a secondary copy, the ground truth 
of the checkpoint state is the primary copy in the distributed store. This
+has implications for problems with local state during checkpointing and 
recovery:
+
+- For checkpointing, the *primary copy must be successful* and a failure 
to produce the *secondary, local copy will not fail* the checkpoint. A 
checkpoint
+will fail if the primary copy could not be created, even if the secondary 
copy was successfully created.
+
+- Only the primary copy is acknowledged and managed by the job manager, 
secondary copies are owned by task managers and their life cycle can be
+independent from their primary copy. For example, it is possible to retain 
a history of the 3 latest checkpoints as primary copies and only keep
+the task-local state of the latest checkpoint.
+
+- For recovery, Flink will always *attempt to restore from task-local 
state first*, if a matching secondary copy is available. If any problem occurs 
during
+the recovery from the secondary copy, Flink will *transparently retry to 
recovery the task from the primary copy*. Recovery only fails, if primary
+and the (optional) secondary copy failed. In this case, depending on the 
configuration Flink could still fall back to an older checkpoint.
+
+- It is possible that the task-local copy contains only parts of the full 
task state (e.g. exception 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168750923
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
-  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168751139
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
-  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168752498
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 ---
@@ -18,81 +18,267 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.FileUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
- * This class holds the all {@link TaskLocalStateStore} objects for a task 
executor (manager).
- *
- * TODO: this still still work in progress and partially still acts as a 
placeholder.
+ * This class holds the all {@link TaskLocalStateStoreImpl} objects for a 
task executor (manager).
  */
 public class TaskExecutorLocalStateStoresManager {
 
+   /** Logger for this class. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class);
+
/**
 * This map holds all local state stores for tasks running on the task 
manager / executor that own the instance of
-* this.
+* this. Maps from allocation id to all the subtask's local state 
stores.
 */
-   private final Map> 
taskStateManagers;
+   @GuardedBy("lock")
+   private final Map> taskStateStoresByAllocationID;
+
+   /** The configured mode for local recovery on this task manager. */
+   private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode;
+
+   /** This is the root directory for all local state of this task manager 
/ executor. */
+   private final File[] localStateRootDirectories;
+
+   /** Executor that runs the discarding of released state objects. */
+   private final Executor discardExecutor;
+
+   /** Guarding lock for taskStateStoresByAllocationID and closed-flag. */
+   private final Object lock;
+
+   private final Thread shutdownHook;
 
-   public TaskExecutorLocalStateStoresManager() {
-   this.taskStateManagers = new HashMap<>();
+   @GuardedBy("lock")
+   private boolean closed;
+
+   public TaskExecutorLocalStateStoresManager(
+   @Nonnull LocalRecoveryConfig.LocalRecoveryMode 
localRecoveryMode,
+   @Nonnull File[] localStateRootDirectories,
+   @Nonnull Executor discardExecutor) {
+
+   this.taskStateStoresByAllocationID = new HashMap<>();
+   this.localRecoveryMode = localRecoveryMode;
+   this.localStateRootDirectories = localStateRootDirectories;
+   this.discardExecutor = discardExecutor;
+   this.lock = new Object();
+   this.closed = false;
+
+   for (File localStateRecoveryRootDir : 
localStateRootDirectories) {
+   if (!localStateRecoveryRootDir.exists()) {
+
+   if (!localStateRecoveryRootDir.mkdirs()) {
+   throw new IllegalStateException("Could 
not create root directory for local recovery: " +
+   localStateRecoveryRootDir);
+   }
+   }
+   }
+
+   // install a shutdown hook
+   this.shutdownHook = new 
Thread("TaskExecutorLocalStateStoresManager shutdown hook") {
+   @Override
+   public void run() {
+   shutdown();
+   }
+   };
+   try {
+   Runtime.getRuntime().addShutdownHook(this.shutdownHook);
+   } catch (IllegalStateException e) {
+   // race, JVM is in shutdown already, we can safely 
ignore this
+   LOG.debug("Unable to add shutdown hook for 
TaskExecutorLocalStateStoresManager, shutdown already in progress", e);
+   } catch (Throwable t) {
+   LOG.warn("Error while adding shutdown hook for 
TaskExecutorLocalStateStoresManager", t);
+   }
}
 
-   public TaskLocalStateStore localStateStoreForTask(
-   JobID jobId,
-  

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168752518
  
--- Diff: docs/ops/state/large_state_tuning.md ---
@@ -234,4 +234,97 @@ Compression can be activated through the 
`ExecutionConfig`:
 **Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
+## Task-Local Recovery
+
+### Motivation
+
+In Flink's checkpointing, each task produces a snapshot of its state that 
is then written to a distributed store. Each task acknowledges
+a successful write of the state to the job manager by sending a handle 
that describes the location of the state in the distributed store.
+The job manager, in turn, collects the handles from all tasks and bundles 
them into a checkpoint object.
+
+In case of recovery, the job manager opens the latest checkpoint object 
and sends the handles back to the corresponding tasks, which can
+then restore their state from the distributed storage. Using a distributed 
storage to store state has two important advantages. First, the storage
+is fault tolerant and second, all state in the distributed store is 
accessible to all nodes and can be easily redistributed (e.g. for rescaling).
+
+However, using a remote distributed store has also one big disadvantage: 
all tasks must read their state from a remote location, over the network.
+In many scenarios, recovery could reschedule failed tasks to the same task 
manager as in the previous run (of course there are exceptions like machine
+failures), but we still have to read remote state. This can result in 
*long recovery times for large states*, even if there was only a small failure 
on
+a single machine.
+
+### Approach
+
+Task-local state recovery targets exactly this problem of long recovery 
times and the main idea is the following: for every checkpoint, we do not
+only write task states to the distributed storage, but also keep *a 
secondary copy of the state snapshot in a storage that is local to the task*
+(e.g. on local disk or in memory). Notice that the primary store for 
snapshots must still be the distributed store, because local storage does not
+ensure durability under node failures abd also does not provide access for 
other nodes to redistribute state, this functionality still requires the
+primary copy.
+
+However, for each task that can be rescheduled to the previous location 
for recovery, we can restore state from the secondary, local
+copy and avoid the costs of reading the state remotely. Given that *many 
failures are not node failures and node failures typically only affect one
+or very few nodes at a time*, it is very likely that in a recovery most 
tasks can return to their previous location and find their local state intact.
+This is what makes local recovery effective in reducing recovery time.
+
+Please note that this can come at some additional costs per checkpoint for 
creating and storing the secondary local state copy, depending on the
+chosen state backend and checkpointing strategy. For example, in most 
cases the implementation will simply duplicate the writes to the distributed
+store to a local file.
+
+
+
+### Relationship of primary (distributed store) and secondary (task-local) 
state snapshots
+
+Task-local state is always considered a secondary copy, the ground truth 
of the checkpoint state is the primary copy in the distributed store. This
+has implications for problems with local state during checkpointing and 
recovery:
+
+- For checkpointing, the *primary copy must be successful* and a failure 
to produce the *secondary, local copy will not fail* the checkpoint. A 
checkpoint
+will fail if the primary copy could not be created, even if the secondary 
copy was successfully created.
+
+- Only the primary copy is acknowledged and managed by the job manager, 
secondary copies are owned by task managers and their life cycle can be
+independent from their primary copy. For example, it is possible to retain 
a history of the 3 latest checkpoints as primary copies and only keep
+the task-local state of the latest checkpoint.
+
+- For recovery, Flink will always *attempt to restore from task-local 
state first*, if a matching secondary copy is available. If any problem occurs 
during
+the recovery from the secondary copy, Flink will *transparently retry to 
recovery the task from the primary copy*. Recovery only fails, if primary
+and the (optional) secondary copy failed. In this case, depending on the 
configuration Flink could still fall back to an older checkpoint.
+
+- It is possible that the task-local copy contains only parts of the full 
task state (e.g. exception 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168752445
  
--- Diff: docs/ops/state/large_state_tuning.md ---
@@ -234,4 +234,97 @@ Compression can be activated through the 
`ExecutionConfig`:
 **Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
+## Task-Local Recovery
+
+### Motivation
+
+In Flink's checkpointing, each task produces a snapshot of its state that 
is then written to a distributed store. Each task acknowledges
+a successful write of the state to the job manager by sending a handle 
that describes the location of the state in the distributed store.
+The job manager, in turn, collects the handles from all tasks and bundles 
them into a checkpoint object.
+
+In case of recovery, the job manager opens the latest checkpoint object 
and sends the handles back to the corresponding tasks, which can
+then restore their state from the distributed storage. Using a distributed 
storage to store state has two important advantages. First, the storage
+is fault tolerant and second, all state in the distributed store is 
accessible to all nodes and can be easily redistributed (e.g. for rescaling).
+
+However, using a remote distributed store has also one big disadvantage: 
all tasks must read their state from a remote location, over the network.
+In many scenarios, recovery could reschedule failed tasks to the same task 
manager as in the previous run (of course there are exceptions like machine
+failures), but we still have to read remote state. This can result in 
*long recovery times for large states*, even if there was only a small failure 
on
+a single machine.
+
+### Approach
+
+Task-local state recovery targets exactly this problem of long recovery 
times and the main idea is the following: for every checkpoint, we do not
+only write task states to the distributed storage, but also keep *a 
secondary copy of the state snapshot in a storage that is local to the task*
+(e.g. on local disk or in memory). Notice that the primary store for 
snapshots must still be the distributed store, because local storage does not
+ensure durability under node failures abd also does not provide access for 
other nodes to redistribute state, this functionality still requires the
+primary copy.
+
+However, for each task that can be rescheduled to the previous location 
for recovery, we can restore state from the secondary, local
+copy and avoid the costs of reading the state remotely. Given that *many 
failures are not node failures and node failures typically only affect one
+or very few nodes at a time*, it is very likely that in a recovery most 
tasks can return to their previous location and find their local state intact.
+This is what makes local recovery effective in reducing recovery time.
+
+Please note that this can come at some additional costs per checkpoint for 
creating and storing the secondary local state copy, depending on the
+chosen state backend and checkpointing strategy. For example, in most 
cases the implementation will simply duplicate the writes to the distributed
+store to a local file.
+
+
+
+### Relationship of primary (distributed store) and secondary (task-local) 
state snapshots
+
+Task-local state is always considered a secondary copy, the ground truth 
of the checkpoint state is the primary copy in the distributed store. This
+has implications for problems with local state during checkpointing and 
recovery:
+
+- For checkpointing, the *primary copy must be successful* and a failure 
to produce the *secondary, local copy will not fail* the checkpoint. A 
checkpoint
+will fail if the primary copy could not be created, even if the secondary 
copy was successfully created.
+
+- Only the primary copy is acknowledged and managed by the job manager, 
secondary copies are owned by task managers and their life cycle can be
+independent from their primary copy. For example, it is possible to retain 
a history of the 3 latest checkpoints as primary copies and only keep
+the task-local state of the latest checkpoint.
+
+- For recovery, Flink will always *attempt to restore from task-local 
state first*, if a matching secondary copy is available. If any problem occurs 
during
+the recovery from the secondary copy, Flink will *transparently retry to 
recovery the task from the primary copy*. Recovery only fails, if primary
+and the (optional) secondary copy failed. In this case, depending on the 
configuration Flink could still fall back to an older checkpoint.
+
+- It is possible that the task-local copy contains only parts of the full 
task state (e.g. exception 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168752196
  
--- Diff: docs/ops/state/large_state_tuning.md ---
@@ -234,4 +234,97 @@ Compression can be activated through the 
`ExecutionConfig`:
 **Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
+## Task-Local Recovery
+
+### Motivation
+
+In Flink's checkpointing, each task produces a snapshot of its state that 
is then written to a distributed store. Each task acknowledges
+a successful write of the state to the job manager by sending a handle 
that describes the location of the state in the distributed store.
+The job manager, in turn, collects the handles from all tasks and bundles 
them into a checkpoint object.
+
+In case of recovery, the job manager opens the latest checkpoint object 
and sends the handles back to the corresponding tasks, which can
+then restore their state from the distributed storage. Using a distributed 
storage to store state has two important advantages. First, the storage
+is fault tolerant and second, all state in the distributed store is 
accessible to all nodes and can be easily redistributed (e.g. for rescaling).
+
+However, using a remote distributed store has also one big disadvantage: 
all tasks must read their state from a remote location, over the network.
+In many scenarios, recovery could reschedule failed tasks to the same task 
manager as in the previous run (of course there are exceptions like machine
+failures), but we still have to read remote state. This can result in 
*long recovery times for large states*, even if there was only a small failure 
on
+a single machine.
+
+### Approach
+
+Task-local state recovery targets exactly this problem of long recovery 
times and the main idea is the following: for every checkpoint, we do not
+only write task states to the distributed storage, but also keep *a 
secondary copy of the state snapshot in a storage that is local to the task*
+(e.g. on local disk or in memory). Notice that the primary store for 
snapshots must still be the distributed store, because local storage does not
+ensure durability under node failures abd also does not provide access for 
other nodes to redistribute state, this functionality still requires the
+primary copy.
+
+However, for each task that can be rescheduled to the previous location 
for recovery, we can restore state from the secondary, local
+copy and avoid the costs of reading the state remotely. Given that *many 
failures are not node failures and node failures typically only affect one
+or very few nodes at a time*, it is very likely that in a recovery most 
tasks can return to their previous location and find their local state intact.
+This is what makes local recovery effective in reducing recovery time.
+
+Please note that this can come at some additional costs per checkpoint for 
creating and storing the secondary local state copy, depending on the
+chosen state backend and checkpointing strategy. For example, in most 
cases the implementation will simply duplicate the writes to the distributed
+store to a local file.
+
+
+
+### Relationship of primary (distributed store) and secondary (task-local) 
state snapshots
+
+Task-local state is always considered a secondary copy, the ground truth 
of the checkpoint state is the primary copy in the distributed store. This
+has implications for problems with local state during checkpointing and 
recovery:
+
+- For checkpointing, the *primary copy must be successful* and a failure 
to produce the *secondary, local copy will not fail* the checkpoint. A 
checkpoint
+will fail if the primary copy could not be created, even if the secondary 
copy was successfully created.
+
+- Only the primary copy is acknowledged and managed by the job manager, 
secondary copies are owned by task managers and their life cycle can be
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168752319
  
--- Diff: docs/ops/state/large_state_tuning.md ---
@@ -234,4 +234,97 @@ Compression can be activated through the 
`ExecutionConfig`:
 **Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
+## Task-Local Recovery
+
+### Motivation
+
+In Flink's checkpointing, each task produces a snapshot of its state that 
is then written to a distributed store. Each task acknowledges
+a successful write of the state to the job manager by sending a handle 
that describes the location of the state in the distributed store.
+The job manager, in turn, collects the handles from all tasks and bundles 
them into a checkpoint object.
+
+In case of recovery, the job manager opens the latest checkpoint object 
and sends the handles back to the corresponding tasks, which can
+then restore their state from the distributed storage. Using a distributed 
storage to store state has two important advantages. First, the storage
+is fault tolerant and second, all state in the distributed store is 
accessible to all nodes and can be easily redistributed (e.g. for rescaling).
+
+However, using a remote distributed store has also one big disadvantage: 
all tasks must read their state from a remote location, over the network.
+In many scenarios, recovery could reschedule failed tasks to the same task 
manager as in the previous run (of course there are exceptions like machine
+failures), but we still have to read remote state. This can result in 
*long recovery times for large states*, even if there was only a small failure 
on
+a single machine.
+
+### Approach
+
+Task-local state recovery targets exactly this problem of long recovery 
times and the main idea is the following: for every checkpoint, we do not
+only write task states to the distributed storage, but also keep *a 
secondary copy of the state snapshot in a storage that is local to the task*
+(e.g. on local disk or in memory). Notice that the primary store for 
snapshots must still be the distributed store, because local storage does not
+ensure durability under node failures abd also does not provide access for 
other nodes to redistribute state, this functionality still requires the
+primary copy.
+
+However, for each task that can be rescheduled to the previous location 
for recovery, we can restore state from the secondary, local
+copy and avoid the costs of reading the state remotely. Given that *many 
failures are not node failures and node failures typically only affect one
+or very few nodes at a time*, it is very likely that in a recovery most 
tasks can return to their previous location and find their local state intact.
+This is what makes local recovery effective in reducing recovery time.
+
+Please note that this can come at some additional costs per checkpoint for 
creating and storing the secondary local state copy, depending on the
+chosen state backend and checkpointing strategy. For example, in most 
cases the implementation will simply duplicate the writes to the distributed
+store to a local file.
+
+
+
+### Relationship of primary (distributed store) and secondary (task-local) 
state snapshots
+
+Task-local state is always considered a secondary copy, the ground truth 
of the checkpoint state is the primary copy in the distributed store. This
+has implications for problems with local state during checkpointing and 
recovery:
+
+- For checkpointing, the *primary copy must be successful* and a failure 
to produce the *secondary, local copy will not fail* the checkpoint. A 
checkpoint
+will fail if the primary copy could not be created, even if the secondary 
copy was successfully created.
+
+- Only the primary copy is acknowledged and managed by the job manager, 
secondary copies are owned by task managers and their life cycle can be
+independent from their primary copy. For example, it is possible to retain 
a history of the 3 latest checkpoints as primary copies and only keep
+the task-local state of the latest checkpoint.
+
+- For recovery, Flink will always *attempt to restore from task-local 
state first*, if a matching secondary copy is available. If any problem occurs 
during
+the recovery from the secondary copy, Flink will *transparently retry to 
recovery the task from the primary copy*. Recovery only fails, if primary
+and the (optional) secondary copy failed. In this case, depending on the 
configuration Flink could still fall back to an older checkpoint.
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168752264
  
--- Diff: docs/ops/state/large_state_tuning.md ---
@@ -234,4 +234,97 @@ Compression can be activated through the 
`ExecutionConfig`:
 **Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
+## Task-Local Recovery
+
+### Motivation
+
+In Flink's checkpointing, each task produces a snapshot of its state that 
is then written to a distributed store. Each task acknowledges
+a successful write of the state to the job manager by sending a handle 
that describes the location of the state in the distributed store.
+The job manager, in turn, collects the handles from all tasks and bundles 
them into a checkpoint object.
+
+In case of recovery, the job manager opens the latest checkpoint object 
and sends the handles back to the corresponding tasks, which can
+then restore their state from the distributed storage. Using a distributed 
storage to store state has two important advantages. First, the storage
+is fault tolerant and second, all state in the distributed store is 
accessible to all nodes and can be easily redistributed (e.g. for rescaling).
+
+However, using a remote distributed store has also one big disadvantage: 
all tasks must read their state from a remote location, over the network.
+In many scenarios, recovery could reschedule failed tasks to the same task 
manager as in the previous run (of course there are exceptions like machine
+failures), but we still have to read remote state. This can result in 
*long recovery times for large states*, even if there was only a small failure 
on
+a single machine.
+
+### Approach
+
+Task-local state recovery targets exactly this problem of long recovery 
times and the main idea is the following: for every checkpoint, we do not
+only write task states to the distributed storage, but also keep *a 
secondary copy of the state snapshot in a storage that is local to the task*
+(e.g. on local disk or in memory). Notice that the primary store for 
snapshots must still be the distributed store, because local storage does not
+ensure durability under node failures abd also does not provide access for 
other nodes to redistribute state, this functionality still requires the
+primary copy.
+
+However, for each task that can be rescheduled to the previous location 
for recovery, we can restore state from the secondary, local
+copy and avoid the costs of reading the state remotely. Given that *many 
failures are not node failures and node failures typically only affect one
+or very few nodes at a time*, it is very likely that in a recovery most 
tasks can return to their previous location and find their local state intact.
+This is what makes local recovery effective in reducing recovery time.
+
+Please note that this can come at some additional costs per checkpoint for 
creating and storing the secondary local state copy, depending on the
+chosen state backend and checkpointing strategy. For example, in most 
cases the implementation will simply duplicate the writes to the distributed
+store to a local file.
+
+
+
+### Relationship of primary (distributed store) and secondary (task-local) 
state snapshots
+
+Task-local state is always considered a secondary copy, the ground truth 
of the checkpoint state is the primary copy in the distributed store. This
+has implications for problems with local state during checkpointing and 
recovery:
+
+- For checkpointing, the *primary copy must be successful* and a failure 
to produce the *secondary, local copy will not fail* the checkpoint. A 
checkpoint
+will fail if the primary copy could not be created, even if the secondary 
copy was successfully created.
+
+- Only the primary copy is acknowledged and managed by the job manager, 
secondary copies are owned by task managers and their life cycle can be
+independent from their primary copy. For example, it is possible to retain 
a history of the 3 latest checkpoints as primary copies and only keep
+the task-local state of the latest checkpoint.
+
+- For recovery, Flink will always *attempt to restore from task-local 
state first*, if a matching secondary copy is available. If any problem occurs 
during
+the recovery from the secondary copy, Flink will *transparently retry to 
recovery the task from the primary copy*. Recovery only fails, if primary
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168752042
  
--- Diff: docs/ops/state/large_state_tuning.md ---
@@ -234,4 +234,97 @@ Compression can be activated through the 
`ExecutionConfig`:
 **Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
+## Task-Local Recovery
+
+### Motivation
+
+In Flink's checkpointing, each task produces a snapshot of its state that 
is then written to a distributed store. Each task acknowledges
+a successful write of the state to the job manager by sending a handle 
that describes the location of the state in the distributed store.
+The job manager, in turn, collects the handles from all tasks and bundles 
them into a checkpoint object.
+
+In case of recovery, the job manager opens the latest checkpoint object 
and sends the handles back to the corresponding tasks, which can
+then restore their state from the distributed storage. Using a distributed 
storage to store state has two important advantages. First, the storage
+is fault tolerant and second, all state in the distributed store is 
accessible to all nodes and can be easily redistributed (e.g. for rescaling).
+
+However, using a remote distributed store has also one big disadvantage: 
all tasks must read their state from a remote location, over the network.
+In many scenarios, recovery could reschedule failed tasks to the same task 
manager as in the previous run (of course there are exceptions like machine
+failures), but we still have to read remote state. This can result in 
*long recovery times for large states*, even if there was only a small failure 
on
+a single machine.
+
+### Approach
+
+Task-local state recovery targets exactly this problem of long recovery 
times and the main idea is the following: for every checkpoint, we do not
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168752064
  
--- Diff: docs/ops/state/large_state_tuning.md ---
@@ -234,4 +234,97 @@ Compression can be activated through the 
`ExecutionConfig`:
 **Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
+## Task-Local Recovery
+
+### Motivation
+
+In Flink's checkpointing, each task produces a snapshot of its state that 
is then written to a distributed store. Each task acknowledges
+a successful write of the state to the job manager by sending a handle 
that describes the location of the state in the distributed store.
+The job manager, in turn, collects the handles from all tasks and bundles 
them into a checkpoint object.
+
+In case of recovery, the job manager opens the latest checkpoint object 
and sends the handles back to the corresponding tasks, which can
+then restore their state from the distributed storage. Using a distributed 
storage to store state has two important advantages. First, the storage
+is fault tolerant and second, all state in the distributed store is 
accessible to all nodes and can be easily redistributed (e.g. for rescaling).
+
+However, using a remote distributed store has also one big disadvantage: 
all tasks must read their state from a remote location, over the network.
+In many scenarios, recovery could reschedule failed tasks to the same task 
manager as in the previous run (of course there are exceptions like machine
+failures), but we still have to read remote state. This can result in 
*long recovery times for large states*, even if there was only a small failure 
on
+a single machine.
+
+### Approach
+
+Task-local state recovery targets exactly this problem of long recovery 
times and the main idea is the following: for every checkpoint, we do not
+only write task states to the distributed storage, but also keep *a 
secondary copy of the state snapshot in a storage that is local to the task*
+(e.g. on local disk or in memory). Notice that the primary store for 
snapshots must still be the distributed store, because local storage does not
+ensure durability under node failures abd also does not provide access for 
other nodes to redistribute state, this functionality still requires the
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168751784
  
--- Diff: docs/ops/state/large_state_tuning.md ---
@@ -234,4 +234,97 @@ Compression can be activated through the 
`ExecutionConfig`:
 **Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
+## Task-Local Recovery
+
+### Motivation
+
+In Flink's checkpointing, each task produces a snapshot of its state that 
is then written to a distributed store. Each task acknowledges
+a successful write of the state to the job manager by sending a handle 
that describes the location of the state in the distributed store.
+The job manager, in turn, collects the handles from all tasks and bundles 
them into a checkpoint object.
+
+In case of recovery, the job manager opens the latest checkpoint object 
and sends the handles back to the corresponding tasks, which can
+then restore their state from the distributed storage. Using a distributed 
storage to store state has two important advantages. First, the storage
+is fault tolerant and second, all state in the distributed store is 
accessible to all nodes and can be easily redistributed (e.g. for rescaling).
+
+However, using a remote distributed store has also one big disadvantage: 
all tasks must read their state from a remote location, over the network.
+In many scenarios, recovery could reschedule failed tasks to the same task 
manager as in the previous run (of course there are exceptions like machine
+failures), but we still have to read remote state. This can result in 
*long recovery times for large states*, even if there was only a small failure 
on
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168747439
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/**
+ * Main implementation of a {@link TaskLocalStateStore}.
+ */
+public class TaskLocalStateStoreImpl implements TaskLocalStateStore {
+
+   /** Logger for this class. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskLocalStateStoreImpl.class);
+
+   /** Maximum number of retained snapshots. */
+   @VisibleForTesting
+   static final int MAX_RETAINED_SNAPSHOTS = 5;
+
+   /** Dummy value to use instead of null to satisfy {@link 
ConcurrentHashMap}. */
+   private final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot();
+
+   /** JobID from the owning subtask. */
+   @Nonnull
+   private final JobID jobID;
+
+   /** AllocationID of the owning slot. */
+   @Nonnull
+   private final AllocationID allocationID;
+
+   /** JobVertexID of the owning subtask. */
+   @Nonnull
+   private final JobVertexID jobVertexID;
+
+   /** Subtask index of the owning subtask. */
+   @Nonnegative
+   private final int subtaskIndex;
+
+   /** The configured mode for local recovery. */
+   @Nonnull
+   private final LocalRecoveryConfig localRecoveryConfig;
+
+   /** Executor that runs the discarding of released state objects. */
+   @Nonnull
+   private final Executor discardExecutor;
+
+   /** Lock for synchronisation on the storage map and the discarded 
status. */
+   @Nonnull
+   private final Object lock;
+
+   /** Status flag if this store was already discarded. */
+   @GuardedBy("lock")
+   private boolean discarded;
+
+   /** Maps checkpoint ids to local TaskStateSnapshots. */
+   @Nonnull
+   @GuardedBy("lock")
+   private final SortedMap 
storedTaskStateByCheckpointID;
+
+   public TaskLocalStateStoreImpl(
+   @Nonnull JobID jobID,
+   @Nonnull AllocationID allocationID,
+   @Nonnull JobVertexID jobVertexID,
+   @Nonnegative int subtaskIndex,
+   @Nonnull LocalRecoveryConfig localRecoveryConfig,
+   @Nonnull Executor discardExecutor) {
+
+   this.jobID = jobID;
+   this.allocationID = allocationID;
+   this.jobVertexID = jobVertexID;
+   this.subtaskIndex = subtaskIndex;
+   this.discardExecutor = discardExecutor;
+   this.lock = new Object();
+   this.storedTaskStateByCheckpointID = new TreeMap<>();
+   this.discarded = false;
+   this.localRecoveryConfig = localRecoveryConfig;
+   }
+
+   @Override
+   public void 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168747007
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/**
+ * Main implementation of a {@link TaskLocalStateStore}.
+ */
+public class TaskLocalStateStoreImpl implements TaskLocalStateStore {
+
+   /** Logger for this class. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskLocalStateStoreImpl.class);
+
+   /** Maximum number of retained snapshots. */
+   @VisibleForTesting
+   static final int MAX_RETAINED_SNAPSHOTS = 5;
+
+   /** Dummy value to use instead of null to satisfy {@link 
ConcurrentHashMap}. */
+   private final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot();
+
+   /** JobID from the owning subtask. */
+   @Nonnull
+   private final JobID jobID;
+
+   /** AllocationID of the owning slot. */
+   @Nonnull
+   private final AllocationID allocationID;
+
+   /** JobVertexID of the owning subtask. */
+   @Nonnull
+   private final JobVertexID jobVertexID;
+
+   /** Subtask index of the owning subtask. */
+   @Nonnegative
+   private final int subtaskIndex;
+
+   /** The configured mode for local recovery. */
+   @Nonnull
+   private final LocalRecoveryConfig localRecoveryConfig;
+
+   /** Executor that runs the discarding of released state objects. */
+   @Nonnull
+   private final Executor discardExecutor;
+
+   /** Lock for synchronisation on the storage map and the discarded 
status. */
+   @Nonnull
+   private final Object lock;
+
+   /** Status flag if this store was already discarded. */
+   @GuardedBy("lock")
+   private boolean discarded;
+
+   /** Maps checkpoint ids to local TaskStateSnapshots. */
+   @Nonnull
+   @GuardedBy("lock")
+   private final SortedMap 
storedTaskStateByCheckpointID;
+
+   public TaskLocalStateStoreImpl(
+   @Nonnull JobID jobID,
+   @Nonnull AllocationID allocationID,
+   @Nonnull JobVertexID jobVertexID,
+   @Nonnegative int subtaskIndex,
+   @Nonnull LocalRecoveryConfig localRecoveryConfig,
+   @Nonnull Executor discardExecutor) {
+
+   this.jobID = jobID;
+   this.allocationID = allocationID;
+   this.jobVertexID = jobVertexID;
+   this.subtaskIndex = subtaskIndex;
+   this.discardExecutor = discardExecutor;
+   this.lock = new Object();
+   this.storedTaskStateByCheckpointID = new TreeMap<>();
+   this.discarded = false;
+   this.localRecoveryConfig = localRecoveryConfig;
+   }
+
+   @Override
+   public void storeLocalState(
   

  1   2   3   4   >