[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-18 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/5248


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160761745
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from 
configurations.
+ */
+public class StateBackendLoader {
+
+   // 

+   //  Configuration shortcut names
+   // 

+
+   /** The shortcut configuration name for the MemoryState backend that 
checkpoints to the JobManager */
+   public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+   /** The shortcut configuration name for the FileSystem State backend */
+   public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+   /** The shortcut configuration name for the RocksDB State Backend */
+   public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+   // 

+   //  Loading the state backend from a configuration 
+   // 

+
+   /**
+* Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+* in {@link CheckpointingOptions#STATE_BACKEND}.
+*
+* The state backends can be specified either via their shortcut 
name, or via the class name
+* of a {@link StateBackendFactory}. If a StateBackendFactory class 
name is specified, the factory
+* is instantiated (via its zero-argument constructor) and its
+* {@link StateBackendFactory#createFromConfig(Configuration)} method 
is called.
+*
+* Recognized shortcut names are '{@value 
StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+*
+* @param config The configuration to load the state backend from
+* @param classLoader The class loader that should be used to load the 
state backend
+* @param logger Optionally, a logger to log actions to (may be null)
+*
+* @return The instantiated state backend.
+*
+* @throws DynamicCodeLoadingException
+* Thrown if a state backend factory is configured and the 
factory class was not
+* found or the factory could not be instantiated
+* @throws IllegalConfigurationException
+* May be thrown by the StateBackendFactory when creating / 
configuring the state
+* backend in the factory
+* @throws IOException
+* May be thrown by the StateBackendFactory when 
instantiating the state backend
+*/
+   pub

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160761001
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An implementation of durable checkpoint storage to file systems.
+ */
+public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
+
+   private final FileSystem fileSystem;
+
+   private final Path checkpointsDirectory;
+
+   private final Path sharedStateDirectory;
+
+   private final Path taskOwnedStateDirectory;
--- End diff --

We may be able to remove this in the future, but because we need to pull 
task owned state out of the "exclusive state" directory soon, and the rework to 
make this use shared state handles is probably more delicate, I would actually 
have this for now. Should be trivial rework to remove it once we don't need it 
any more.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160761075
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 ---
@@ -18,92 +18,272 @@
 
 package org.apache.flink.runtime.state.memory;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
- * A {@link AbstractStateBackend} that stores all its data and checkpoints 
in memory and has no
- * capabilities to spill to disk. Checkpoints are serialized and the 
serialized data is
- * transferred
+ * This state backend holds the working state in the memory (JVM heap) of 
the TaskManagers.
+ * The state backend checkpoints state directly to the JobManager's memory 
(hence the backend's name),
+ * but the checkpoints will be persisted to a file system for 
high-availability setups and savepoints.
+ * The MemoryStateBackend is consequently a FileSystem-based backend that 
can work without a
+ * file system dependency in simple setups.
+ *
+ * This state backend should be used only for experimentation, quick 
local setups,
+ * or for streaming applications that have very small state: Because it 
requires checkpoints to
+ * go through the JobManager's memory, larger state will occupy larger 
portions of the JobManager's  
+ * main memory, reducing operational stability.
+ * For any other setup, the {@link 
org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend}
+ * should be used. The {@code FsStateBackend} holds the working state on 
the TaskManagers in the same way, but
+ * checkpoints state directly to files rather then to the JobManager's 
memory, thus supporting
+ * large state sizes. 
+ *
+ * State Size Considerations
+ *
+ * State checkpointing with this state backend is subject to the following 
conditions:
+ * 
+ * Each individual state must not exceed the configured maximum 
state size
+ * (see {@link #getMaxStateSize()}.
+ *
+ * All state from one task (i.e., the sum of all operator states 
and keyed states from all
+ * chained operators of the task) must not exceed what the RPC 
system supports, which is
+ * be default < 10 MB. That limit can be configured up, but that 
is typically not advised.
+ *
+ * The sum of all states in the application times all retained 
checkpoints must comfortably
+ * fit into the JobManager's JVM heap space.
+ * 
+ *
+ * Persistence Guarantees
+ *
+ * For the use cases where the state sizes can be handled by this backend, 
the backend does guarantee
+ * persistence for savepoints, externalized checkpoints (of configured), 
and checkpoints
+ * (when high-availability is configured).
+ *
+ * Configuration
+ *
+ * As for all state backends, this backend can either be configured within 
the application (by creating
+ * the backend with the respective constructor parameters and setting it 
on the execution environment)
+ * or by specifying it in the Flink configuration.
+ *
+ * If the state backend was specified in the application, it may pick 
up additional configuration
+ * parameters from the Flink configuration. For example, if the backend if 
configured in the application
+ * without a default savepoint directory, it will pick up a default 
savepoint directory specified in the
+ * Flink configuration of the running job/cluster. That behavior is 
implemented via the
+ * {@link #configure(Configuration)} method.
  */
-public class MemoryStateBackend extends AbstractStateBackend {
   

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160760708
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A base class for all state backends that store their metadata (and 
data) in files.
+ * Examples that inherit from this are the {@link FsStateBackend}, the
+ * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend 
MemoryStateBackend}, or the
+ * {@code RocksDBStateBackend}.
+ *
+ * This class takes the base checkpoint- and savepoint directory paths, 
but also accepts null
+ * for both of then, in which case creating externalized checkpoint is not 
possible, and it is not
+ * possible to create a savepoint with a default path. Null is accepted to 
enable implementations
+ * that only optionally support default savepoints and externalized 
checkpoints.
+ *
+ * Checkpoint Layout
+ *
+ * The state backend is configured with a base directory and persists 
the checkpoint data of specific
+ * checkpoints in specific subdirectories. For example, if the base 
directory was set to
+ * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will 
create a subdirectory with
+ * the job's ID that will contain the actual checkpoints:
+ * ({@code 
hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
+ *
+ * Each checkpoint individually will store all its files in a 
subdirectory that includes the
+ * checkpoint number, such as {@code 
hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
+ *
+ * Savepoint Layout
+ *
+ * A savepoint that is set to be stored in path {@code 
hdfs://namenode:port/flink-savepoints/}, will create
+ * a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it 
stores all savepoint data.
+ * The random digits are added as "entropy" to avoid directory collisions.
+ *
+ * Metadata and Success Files
+ *
+ * A completed checkpoint writes its metadata into a file '{@value 
AbstractFsCheckpointStorage#METADATA_FILE_NAME}'.
+ * After that is complete (i.e., the file complete), it writes an 
additional file
+ * '{@value AbstractFsCheckpointStorage#SUCCESS_FILE_NAME}'.
+ *
+ * Ideally that would not be necessary, and one would write the 
metadata file to a temporary file and
+ * then issue a atomic (or at least constant time) rename. But some of the 
file systems (like S3) do
+ * not support that: A rename is a copy process which, when failing, 
leaves corrupt half written
+ * files/objects. The success file is hence needed as a signal that the
+ * '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'file is 
complete.
+ */
+@PublicEvolving
+public abstract class AbstractFileStateBackend extends 
AbstractStateBackend {
+
+   private static final long serialVersionUID = 1L;
+
+   // 

+   //  State Backend Properties
+   // 

+
+   /** The path where checkpoints will be stored, or null, if none has 
been configured. */
+   @Nullable
+   private final Path baseCheckpoin

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160759604
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from 
configurations.
+ */
+public class StateBackendLoader {
+
+   // 

+   //  Configuration shortcut names
+   // 

+
+   /** The shortcut configuration name for the MemoryState backend that 
checkpoints to the JobManager */
+   public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+   /** The shortcut configuration name for the FileSystem State backend */
+   public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+   /** The shortcut configuration name for the RocksDB State Backend */
+   public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+   // 

+   //  Loading the state backend from a configuration 
+   // 

+
+   /**
+* Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+* in {@link CheckpointingOptions#STATE_BACKEND}.
+*
+* The state backends can be specified either via their shortcut 
name, or via the class name
+* of a {@link StateBackendFactory}. If a StateBackendFactory class 
name is specified, the factory
+* is instantiated (via its zero-argument constructor) and its
+* {@link StateBackendFactory#createFromConfig(Configuration)} method 
is called.
+*
+* Recognized shortcut names are '{@value 
StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+*
+* @param config The configuration to load the state backend from
+* @param classLoader The class loader that should be used to load the 
state backend
+* @param logger Optionally, a logger to log actions to (may be null)
+*
+* @return The instantiated state backend.
+*
+* @throws DynamicCodeLoadingException
+* Thrown if a state backend factory is configured and the 
factory class was not
+* found or the factory could not be instantiated
+* @throws IllegalConfigurationException
+* May be thrown by the StateBackendFactory when creating / 
configuring the state
+* backend in the factory
+* @throws IOException
+* May be thrown by the StateBackendFactory when 
instantiating the state backend
+*/
+   pub

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160758820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -84,13 +84,44 @@
 public interface StateBackend extends java.io.Serializable {
 
// 

-   //  Persistent Bytes Storage
+   //  Checkpoint storage - the durable persistence of checkpoint data
+   // 

+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
--- End diff --

See above, I don't think we get around String.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160758717
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -84,13 +84,44 @@
 public interface StateBackend extends java.io.Serializable {
 
// 

-   //  Persistent Bytes Storage
+   //  Checkpoint storage - the durable persistence of checkpoint data
--- End diff --

Agreed. In the end we have:
  - `CheckpointStorage`: Persisting bytes and Metadata. The Stream 
factories should go there in the next part of the rework.
  - `KeyedStateBackend`: Holding / snapshotting keyed state
  - `OperatorStateBackend`: Holding / snapshotting operator state

These should be three completely independent interfaces, and this PR moves 
towards that by introducing the `CheckpointStorage`, even though it does not 
yet move the Stream Factories there.

Once the rework is done, the `StateBackend` just bundles a combination is 
the three above components (should have only three methods, maybe convenience 
overload).


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160757943
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import java.io.IOException;
+
+/**
+ * A storage location for one particular checkpoint. This location is 
typically
+ * created and initialized via {@link 
CheckpointStorage#initializeCheckpoint(long)} or
+ * {@link CheckpointStorage#initializeSavepoint(long, String)}.
+ */
+public interface CheckpointStorageLocation {
+
+   /**
+* Creates the output stream to persist the checkpoint metadata to.
+*
+* @return The output stream to persist the checkpoint metadata to.
+* @throws IOException Thrown, if the stream cannot be opened due to an 
I/O error.
+*/
+   CheckpointStateOutputStream createMetadataOutputStream() throws 
IOException;
+
+   /**
+* Finalizes the checkpoint, marking the location as a finished 
checkpoint.
+* This method returns the external checkpoint pointer that can be used 
to resolve
+* the checkpoint upon recovery.
+*
+* @return The external pointer to the checkpoint at this location.
+* @throws IOException Thrown, if finalizing / marking as finished 
fails due to an I/O error.
+*/
+   String markCheckpointAsFinished() throws IOException;
--- End diff --

I have gone back and forth on this. I felt like keeping it, because we do 
need to get to some form of external pointer (a String) in the end (for ZK and 
for external resume). This couples the finalization and obtaining that pointer, 
which makes sense to me (the pointer may not be producible before 
finalization). Pushing that into the Metadata's OutputStream needs another 
interface that creates a "handle and pointer" on closing. I had that in a 
previous version, it felt much more clumsy. This variant seems nicer to me.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160756970
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
+
+   /**
+* Checks whether the storage has a default savepoint location 
configured.
+*/
+   boolean hasDefaultSavepointLocation();
+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
+
+   /**
+* Initializes a storage location for new checkpoint with the given ID.
+*
+* The returned storage location can be used to write the checkpoint 
data and metadata
+* to and to obtain the pointers for the location(s) where the actual 
checkpoint data should be
+* stored.
+*
+* @param checkpointId The ID (logical timestamp) of the checkpoint 
that should be persisted.
+* @return A storage location for the data and metadata of the given 
checkpoint.
+*
+* @throws IOException Thrown if the storage location cannot be 
initialized due to an I/O exception.
+*/
+   CheckpointStorageLocation initializeCheckpoint(long checkpointId) 
throws IOException;
--- End diff --

How about `initializeLocationForCheckpoint`?


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160756860
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
+
+   /**
+* Checks whether the storage has a default savepoint location 
configured.
+*/
+   boolean hasDefaultSavepointLocation();
+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
--- End diff --

I was wondering about that as well. My thinking was that in the end, what 
we get is just Strings (from the command line, from ZooKeeper, from the REST 
interface) and there is no point before that method that could parse/convert 
the string into something state backend specific. So we have to deal with 
Strings anyways. Wrapping them in another class felt not too useful to me here.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160756347
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
--- End diff --

Yes, this is part of future work. The high-level description says that:

> To make sure there is no extra persisting of the checkpoint metadata by 
the HA store (it simply references the regular persisted checkpoint metadata) 
we need some changes to the ZooKeeperCompletedCheckpointStore.

The method helps as a sanity check, to allow the checkpoint coordinator to 
validate that if the `CompletedCheckpointStore` requires durable persistence, 
then the `CheckpointStore` must provide that. It is used to catch a weird 
corner case where the ZkCompletedCheckpointStore is used, but the StateStore 
(from the MemoryStateBackend) does not support actually persisting any data 
durably. We could remove that, because currently this can never happen (The 
MemoryStateBackend automatically fills in durable persistence in HA setups)


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160755282
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
--- End diff --

The storage will create the stream factories later, that is the plan. 
Simply did not want to push that in addition into the pull request. This of 
this pull request as the "JobManager" side of things, the "TaskManager" side of 
thing is to come next.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160754932
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
 ---
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class with the methods to write/load/dispose the checkpoint 
and savepoint metadata.
+ *
+ * Stored checkpoint metadata files have the following format:
+ * [MagicNumber (int) | Format Version (int) | Checkpoint Metadata 
(variable)]
+ *
+ * The actual savepoint serialization is version-specific via the 
{@link SavepointSerializer}.
+ */
+public class Checkpoints {
--- End diff --

This class mimics somewhat the way this was done previously, scattered over 
the `SavepointLoader` and `SavepointStore` class. I changed it to `Checkpoints` 
because the code was relevant not only to savepoints, but to checkpoints in 
general. It was mainly a bit of name-fixing work as part of the adjustment .

Making this pluggable is another orthogonal effort in my opinion, and also 
a probably more involved one.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160752496
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 ---
@@ -251,77 +225,66 @@ public String toString() {
false,
false);
 
-   private static final CheckpointProperties STANDARD_CHECKPOINT = new 
CheckpointProperties(
-   false,
+   private static final CheckpointProperties CHECKPOINT_NEVER_RETAINED = 
new CheckpointProperties(
false,
false,
true,
-   true,
-   true,
-   true,
-   true);
+   true,  // Delete on success
+   true,  // Delete on cancellation
+   true,  // Delete on failure
+   true); // Delete on suspension
 
-   private static final CheckpointProperties 
EXTERNALIZED_CHECKPOINT_RETAINED = new CheckpointProperties(
+   private static final CheckpointProperties 
CHECKPOINT_RETAINED_ON_FAILURE = new CheckpointProperties(
false,
-   true,
false,
true,
-   true,
-   false, // Retain on cancellation
-   false,
-   false); // Retain on suspension
+   true,  // Delete on success
+   true,  // Delete on cancellation
+   false, // Retain on failure
+   true); // Delete on suspension
 
-   private static final CheckpointProperties 
EXTERNALIZED_CHECKPOINT_DELETED = new CheckpointProperties(
+   private static final CheckpointProperties 
CHECKPOINT_RETAINED_ON_CANCELLATION = new CheckpointProperties(
false,
-   true,
false,
true,
-   true,
-   true, // Delete on cancellation
-   false,
-   true); // Delete on suspension
+   true,   // Delete on success
+   false,  // Retain on cancellation
+   false,  // Retain on failure
+   false); // Retain on suspension
+
 
/**
 * Creates the checkpoint properties for a (manually triggered) 
savepoint.
 *
-* Savepoints are forced and persisted externally. They have to be
+* Savepoints are not queued due to time trigger limits. They have 
to be
 * garbage collected manually.
 *
 * @return Checkpoint properties for a (manually triggered) savepoint.
 */
-   public static CheckpointProperties forStandardSavepoint() {
-   return STANDARD_SAVEPOINT;
-   }
-
-   /**
-* Creates the checkpoint properties for a regular checkpoint.
-*
-* Regular checkpoints are not forced and not persisted externally. 
They
-* are garbage collected automatically.
-*
-* @return Checkpoint properties for a regular checkpoint.
-*/
-   public static CheckpointProperties forStandardCheckpoint() {
-   return STANDARD_CHECKPOINT;
+   public static CheckpointProperties forSavepoint() {
+   return SAVEPOINT;
}
 
/**
-* Creates the checkpoint properties for an external checkpoint.
+* Creates the checkpoint properties for a checkpoint.
 *
-* External checkpoints are not forced, but persisted externally. 
They
-* are garbage collected automatically, except when the owning job
+* Checkpoints may be queued in case too many other checkpoints are 
currently happening.
+* They are garbage collected automatically, except when the owning job
 * terminates in state {@link JobStatus#FAILED}. The user is required to
 * configure the clean up behaviour on job cancellation.
 *
-* @param deleteOnCancellation Flag indicating whether to discard on 
cancellation.
-*
 * @return Checkpoint properties for an external checkpoint.
 */
-   public static CheckpointProperties forExternalizedCheckpoint(boolean 
deleteOnCancellation) {
-   if (deleteOnCancellation) {
-   return EXTERNALIZED_CHECKPOINT_DELETED;
-   } else {
-   return EXTERNALIZED_CHECKPOINT_RETAINED;
+   public static CheckpointProperties 
forCheckpoint(CheckpointRetentionPolicy policy) {
--- End diff --

There is actually a `forSavepoint(...)` method, that's why I think keeping 
the name makes sense. We could change it to `forCheckpointWithPolic

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160752558
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -1185,6 +1153,10 @@ public int 
getNumberOfRetainedSuccessfulCheckpoints() {
}
}
 
+   public CheckpointStorage getCheckpointStorage() {
--- End diff --

What names would you suggest to use?


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160751897
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+   // 

+   //  general checkpoint and state backend options
+   // 

+
+   public static final ConfigOption STATE_BACKEND = ConfigOptions
+   .key("state.backend")
+   .noDefaultValue();
+
+   /** The maximum number of completed checkpoint instances to retain.*/
+   public static final ConfigOption MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+   .key("state.checkpoints.num-retained")
+   .defaultValue(1);
+
+   // 

+   //  Options specific to the file-system-based state backends
+   // 

+
+   /** The default directory for savepoints. Used by the state backends 
that write
+* savepoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption SAVEPOINT_DIRECTORY = 
ConfigOptions
+   .key("state.savepoints.dir")
+   .noDefaultValue()
+   .withDeprecatedKeys("savepoints.state.backend.fs.dir");
+
+   /** The default directory used for checkpoints. Used by the state 
backends that write
+* checkpoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption CHECKPOINTS_DIRECTORY = 
ConfigOptions
+   .key("state.checkpoints.dir")
+   .noDefaultValue();
+
+   /** Option whether the heap-based key/value data structures should use 
an asynchronous
+* snapshot method. Used by MemoryStateBackend and FsStateBackend. */
+   public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = 
ConfigOptions
+   .key("state.backend.heap.async")
--- End diff --

Makes sense, let's scope them more generally.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160718388
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
+
+   /**
+* Checks whether the storage has a default savepoint location 
configured.
+*/
+   boolean hasDefaultSavepointLocation();
+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
--- End diff --

Furthermore, I suggest to introduce a separate class like 
`(Checkpoint)StoragePointer` instead of plain `String`. Internally, the class 
can be really just a string, but I find it very helpful for reading to code if 
there is a meaningful classname attached to a concept instead of raw String 
usage. Also helps to avoid autocomplete mistakes for methods that take a 
pointer plus other strings.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160686745
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from 
configurations.
+ */
+public class StateBackendLoader {
+
+   // 

+   //  Configuration shortcut names
+   // 

+
+   /** The shortcut configuration name for the MemoryState backend that 
checkpoints to the JobManager */
+   public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+   /** The shortcut configuration name for the FileSystem State backend */
+   public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+   /** The shortcut configuration name for the RocksDB State Backend */
+   public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+   // 

+   //  Loading the state backend from a configuration 
+   // 

+
+   /**
+* Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+* in {@link CheckpointingOptions#STATE_BACKEND}.
+*
+* The state backends can be specified either via their shortcut 
name, or via the class name
+* of a {@link StateBackendFactory}. If a StateBackendFactory class 
name is specified, the factory
+* is instantiated (via its zero-argument constructor) and its
+* {@link StateBackendFactory#createFromConfig(Configuration)} method 
is called.
+*
+* Recognized shortcut names are '{@value 
StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+*
+* @param config The configuration to load the state backend from
+* @param classLoader The class loader that should be used to load the 
state backend
+* @param logger Optionally, a logger to log actions to (may be null)
+*
+* @return The instantiated state backend.
+*
+* @throws DynamicCodeLoadingException
+* Thrown if a state backend factory is configured and the 
factory class was not
+* found or the factory could not be instantiated
+* @throws IllegalConfigurationException
+* May be thrown by the StateBackendFactory when creating / 
configuring the state
+* backend in the factory
+* @throws IOException
+* May be thrown by the StateBackendFactory when 
instantiating the state backend
+*/
+   

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160690634
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from 
configurations.
+ */
+public class StateBackendLoader {
+
+   // 

+   //  Configuration shortcut names
+   // 

+
+   /** The shortcut configuration name for the MemoryState backend that 
checkpoints to the JobManager */
+   public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+   /** The shortcut configuration name for the FileSystem State backend */
+   public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+   /** The shortcut configuration name for the RocksDB State Backend */
+   public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+   // 

+   //  Loading the state backend from a configuration 
+   // 

+
+   /**
+* Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+* in {@link CheckpointingOptions#STATE_BACKEND}.
+*
+* The state backends can be specified either via their shortcut 
name, or via the class name
+* of a {@link StateBackendFactory}. If a StateBackendFactory class 
name is specified, the factory
+* is instantiated (via its zero-argument constructor) and its
+* {@link StateBackendFactory#createFromConfig(Configuration)} method 
is called.
+*
+* Recognized shortcut names are '{@value 
StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+*
+* @param config The configuration to load the state backend from
+* @param classLoader The class loader that should be used to load the 
state backend
+* @param logger Optionally, a logger to log actions to (may be null)
+*
+* @return The instantiated state backend.
+*
+* @throws DynamicCodeLoadingException
+* Thrown if a state backend factory is configured and the 
factory class was not
+* found or the factory could not be instantiated
+* @throws IllegalConfigurationException
+* May be thrown by the StateBackendFactory when creating / 
configuring the state
+* backend in the factory
+* @throws IOException
+* May be thrown by the StateBackendFactory when 
instantiating the state backend
+*/
+   

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160718229
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
+
+   /**
+* Checks whether the storage has a default savepoint location 
configured.
+*/
+   boolean hasDefaultSavepointLocation();
+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
--- End diff --

I think `resolveStoragePointerForCheckpoint(...)` or similar might be a 
better name.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160687956
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from 
configurations.
+ */
+public class StateBackendLoader {
+
+   // 

+   //  Configuration shortcut names
+   // 

+
+   /** The shortcut configuration name for the MemoryState backend that 
checkpoints to the JobManager */
+   public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+   /** The shortcut configuration name for the FileSystem State backend */
+   public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+   /** The shortcut configuration name for the RocksDB State Backend */
+   public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+   // 

+   //  Loading the state backend from a configuration 
+   // 

+
+   /**
+* Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+* in {@link CheckpointingOptions#STATE_BACKEND}.
+*
+* The state backends can be specified either via their shortcut 
name, or via the class name
+* of a {@link StateBackendFactory}. If a StateBackendFactory class 
name is specified, the factory
+* is instantiated (via its zero-argument constructor) and its
+* {@link StateBackendFactory#createFromConfig(Configuration)} method 
is called.
+*
+* Recognized shortcut names are '{@value 
StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+*
+* @param config The configuration to load the state backend from
+* @param classLoader The class loader that should be used to load the 
state backend
+* @param logger Optionally, a logger to log actions to (may be null)
+*
+* @return The instantiated state backend.
+*
+* @throws DynamicCodeLoadingException
+* Thrown if a state backend factory is configured and the 
factory class was not
+* found or the factory could not be instantiated
+* @throws IllegalConfigurationException
+* May be thrown by the StateBackendFactory when creating / 
configuring the state
+* backend in the factory
+* @throws IOException
+* May be thrown by the StateBackendFactory when 
instantiating the state backend
+*/
+   

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160698030
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
 ---
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class with the methods to write/load/dispose the checkpoint 
and savepoint metadata.
+ *
+ * Stored checkpoint metadata files have the following format:
+ * [MagicNumber (int) | Format Version (int) | Checkpoint Metadata 
(variable)]
+ *
+ * The actual savepoint serialization is version-specific via the 
{@link SavepointSerializer}.
+ */
+public class Checkpoints {
--- End diff --

This class contains a lot of static methods that somehow look like they 
want to belong to a checkpoint/savepoint store object. For the sake of keeping 
those part replaceable for unit tests, I wonder if we should eliminate the 
static nature of this code?


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160690515
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from 
configurations.
+ */
+public class StateBackendLoader {
+
+   // 

+   //  Configuration shortcut names
+   // 

+
+   /** The shortcut configuration name for the MemoryState backend that 
checkpoints to the JobManager */
+   public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+   /** The shortcut configuration name for the FileSystem State backend */
+   public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+   /** The shortcut configuration name for the RocksDB State Backend */
+   public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+   // 

+   //  Loading the state backend from a configuration 
+   // 

+
+   /**
+* Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+* in {@link CheckpointingOptions#STATE_BACKEND}.
+*
+* The state backends can be specified either via their shortcut 
name, or via the class name
+* of a {@link StateBackendFactory}. If a StateBackendFactory class 
name is specified, the factory
+* is instantiated (via its zero-argument constructor) and its
+* {@link StateBackendFactory#createFromConfig(Configuration)} method 
is called.
+*
+* Recognized shortcut names are '{@value 
StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+*
+* @param config The configuration to load the state backend from
+* @param classLoader The class loader that should be used to load the 
state backend
+* @param logger Optionally, a logger to log actions to (may be null)
+*
+* @return The instantiated state backend.
+*
+* @throws DynamicCodeLoadingException
+* Thrown if a state backend factory is configured and the 
factory class was not
+* found or the factory could not be instantiated
+* @throws IllegalConfigurationException
+* May be thrown by the StateBackendFactory when creating / 
configuring the state
+* backend in the factory
+* @throws IOException
+* May be thrown by the StateBackendFactory when 
instantiating the state backend
+*/
+   

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160713950
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 ---
@@ -251,77 +225,66 @@ public String toString() {
false,
false);
 
-   private static final CheckpointProperties STANDARD_CHECKPOINT = new 
CheckpointProperties(
-   false,
+   private static final CheckpointProperties CHECKPOINT_NEVER_RETAINED = 
new CheckpointProperties(
false,
false,
true,
-   true,
-   true,
-   true,
-   true);
+   true,  // Delete on success
+   true,  // Delete on cancellation
+   true,  // Delete on failure
+   true); // Delete on suspension
 
-   private static final CheckpointProperties 
EXTERNALIZED_CHECKPOINT_RETAINED = new CheckpointProperties(
+   private static final CheckpointProperties 
CHECKPOINT_RETAINED_ON_FAILURE = new CheckpointProperties(
false,
-   true,
false,
true,
-   true,
-   false, // Retain on cancellation
-   false,
-   false); // Retain on suspension
+   true,  // Delete on success
+   true,  // Delete on cancellation
+   false, // Retain on failure
+   true); // Delete on suspension
 
-   private static final CheckpointProperties 
EXTERNALIZED_CHECKPOINT_DELETED = new CheckpointProperties(
+   private static final CheckpointProperties 
CHECKPOINT_RETAINED_ON_CANCELLATION = new CheckpointProperties(
false,
-   true,
false,
true,
-   true,
-   true, // Delete on cancellation
-   false,
-   true); // Delete on suspension
+   true,   // Delete on success
+   false,  // Retain on cancellation
+   false,  // Retain on failure
+   false); // Retain on suspension
+
 
/**
 * Creates the checkpoint properties for a (manually triggered) 
savepoint.
 *
-* Savepoints are forced and persisted externally. They have to be
+* Savepoints are not queued due to time trigger limits. They have 
to be
 * garbage collected manually.
 *
 * @return Checkpoint properties for a (manually triggered) savepoint.
 */
-   public static CheckpointProperties forStandardSavepoint() {
-   return STANDARD_SAVEPOINT;
-   }
-
-   /**
-* Creates the checkpoint properties for a regular checkpoint.
-*
-* Regular checkpoints are not forced and not persisted externally. 
They
-* are garbage collected automatically.
-*
-* @return Checkpoint properties for a regular checkpoint.
-*/
-   public static CheckpointProperties forStandardCheckpoint() {
-   return STANDARD_CHECKPOINT;
+   public static CheckpointProperties forSavepoint() {
+   return SAVEPOINT;
}
 
/**
-* Creates the checkpoint properties for an external checkpoint.
+* Creates the checkpoint properties for a checkpoint.
 *
-* External checkpoints are not forced, but persisted externally. 
They
-* are garbage collected automatically, except when the owning job
+* Checkpoints may be queued in case too many other checkpoints are 
currently happening.
+* They are garbage collected automatically, except when the owning job
 * terminates in state {@link JobStatus#FAILED}. The user is required to
 * configure the clean up behaviour on job cancellation.
 *
-* @param deleteOnCancellation Flag indicating whether to discard on 
cancellation.
-*
 * @return Checkpoint properties for an external checkpoint.
 */
-   public static CheckpointProperties forExternalizedCheckpoint(boolean 
deleteOnCancellation) {
-   if (deleteOnCancellation) {
-   return EXTERNALIZED_CHECKPOINT_DELETED;
-   } else {
-   return EXTERNALIZED_CHECKPOINT_RETAINED;
+   public static CheckpointProperties 
forCheckpoint(CheckpointRetentionPolicy policy) {
--- End diff --

I suggest changing the method name to `forPolicy(...)` or 
`forRetentionPolicy(...)`.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160630316
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+   // 

+   //  general checkpoint and state backend options
+   // 

+
+   public static final ConfigOption STATE_BACKEND = ConfigOptions
+   .key("state.backend")
+   .noDefaultValue();
+
+   /** The maximum number of completed checkpoint instances to retain.*/
+   public static final ConfigOption MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+   .key("state.checkpoints.num-retained")
+   .defaultValue(1);
+
+   // 

+   //  Options specific to the file-system-based state backends
+   // 

+
+   /** The default directory for savepoints. Used by the state backends 
that write
+* savepoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption SAVEPOINT_DIRECTORY = 
ConfigOptions
+   .key("state.savepoints.dir")
+   .noDefaultValue()
+   .withDeprecatedKeys("savepoints.state.backend.fs.dir");
+
+   /** The default directory used for checkpoints. Used by the state 
backends that write
+* checkpoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption CHECKPOINTS_DIRECTORY = 
ConfigOptions
+   .key("state.checkpoints.dir")
+   .noDefaultValue();
+
+   /** Option whether the heap-based key/value data structures should use 
an asynchronous
+* snapshot method. Used by MemoryStateBackend and FsStateBackend. */
+   public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = 
ConfigOptions
+   .key("state.backend.heap.async")
--- End diff --

I wonder if it could make sense to not scope options like async or 
incremental per backend, just having them as a general switch that backends may 
or may not support. We could log if a backend choses to ignore a configuration 
because does not support it.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160638882
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 ---
@@ -18,92 +18,272 @@
 
 package org.apache.flink.runtime.state.memory;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
- * A {@link AbstractStateBackend} that stores all its data and checkpoints 
in memory and has no
- * capabilities to spill to disk. Checkpoints are serialized and the 
serialized data is
- * transferred
+ * This state backend holds the working state in the memory (JVM heap) of 
the TaskManagers.
+ * The state backend checkpoints state directly to the JobManager's memory 
(hence the backend's name),
+ * but the checkpoints will be persisted to a file system for 
high-availability setups and savepoints.
+ * The MemoryStateBackend is consequently a FileSystem-based backend that 
can work without a
+ * file system dependency in simple setups.
+ *
+ * This state backend should be used only for experimentation, quick 
local setups,
+ * or for streaming applications that have very small state: Because it 
requires checkpoints to
+ * go through the JobManager's memory, larger state will occupy larger 
portions of the JobManager's  
+ * main memory, reducing operational stability.
+ * For any other setup, the {@link 
org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend}
+ * should be used. The {@code FsStateBackend} holds the working state on 
the TaskManagers in the same way, but
+ * checkpoints state directly to files rather then to the JobManager's 
memory, thus supporting
+ * large state sizes. 
+ *
+ * State Size Considerations
+ *
+ * State checkpointing with this state backend is subject to the following 
conditions:
+ * 
+ * Each individual state must not exceed the configured maximum 
state size
+ * (see {@link #getMaxStateSize()}.
+ *
+ * All state from one task (i.e., the sum of all operator states 
and keyed states from all
+ * chained operators of the task) must not exceed what the RPC 
system supports, which is
+ * be default < 10 MB. That limit can be configured up, but that 
is typically not advised.
+ *
+ * The sum of all states in the application times all retained 
checkpoints must comfortably
+ * fit into the JobManager's JVM heap space.
+ * 
+ *
+ * Persistence Guarantees
+ *
+ * For the use cases where the state sizes can be handled by this backend, 
the backend does guarantee
+ * persistence for savepoints, externalized checkpoints (of configured), 
and checkpoints
+ * (when high-availability is configured).
+ *
+ * Configuration
+ *
+ * As for all state backends, this backend can either be configured within 
the application (by creating
+ * the backend with the respective constructor parameters and setting it 
on the execution environment)
+ * or by specifying it in the Flink configuration.
+ *
+ * If the state backend was specified in the application, it may pick 
up additional configuration
+ * parameters from the Flink configuration. For example, if the backend if 
configured in the application
+ * without a default savepoint directory, it will pick up a default 
savepoint directory specified in the
+ * Flink configuration of the running job/cluster. That behavior is 
implemented via the
+ * {@link #configure(Configuration)} method.
  */
-public class MemoryStateBackend extends AbstractStateBackend {

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160685735
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from 
configurations.
+ */
+public class StateBackendLoader {
+
+   // 

+   //  Configuration shortcut names
+   // 

+
+   /** The shortcut configuration name for the MemoryState backend that 
checkpoints to the JobManager */
+   public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+   /** The shortcut configuration name for the FileSystem State backend */
+   public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+   /** The shortcut configuration name for the RocksDB State Backend */
+   public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+   // 

+   //  Loading the state backend from a configuration 
+   // 

+
+   /**
+* Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+* in {@link CheckpointingOptions#STATE_BACKEND}.
+*
+* The state backends can be specified either via their shortcut 
name, or via the class name
+* of a {@link StateBackendFactory}. If a StateBackendFactory class 
name is specified, the factory
+* is instantiated (via its zero-argument constructor) and its
+* {@link StateBackendFactory#createFromConfig(Configuration)} method 
is called.
+*
+* Recognized shortcut names are '{@value 
StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+*
+* @param config The configuration to load the state backend from
+* @param classLoader The class loader that should be used to load the 
state backend
+* @param logger Optionally, a logger to log actions to (may be null)
+*
+* @return The instantiated state backend.
+*
+* @throws DynamicCodeLoadingException
+* Thrown if a state backend factory is configured and the 
factory class was not
+* found or the factory could not be instantiated
+* @throws IllegalConfigurationException
+* May be thrown by the StateBackendFactory when creating / 
configuring the state
+* backend in the factory
+* @throws IOException
+* May be thrown by the StateBackendFactory when 
instantiating the state backend
+*/
+   

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160646300
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A base class for all state backends that store their metadata (and 
data) in files.
+ * Examples that inherit from this are the {@link FsStateBackend}, the
+ * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend 
MemoryStateBackend}, or the
+ * {@code RocksDBStateBackend}.
+ *
+ * This class takes the base checkpoint- and savepoint directory paths, 
but also accepts null
+ * for both of then, in which case creating externalized checkpoint is not 
possible, and it is not
+ * possible to create a savepoint with a default path. Null is accepted to 
enable implementations
+ * that only optionally support default savepoints and externalized 
checkpoints.
+ *
+ * Checkpoint Layout
+ *
+ * The state backend is configured with a base directory and persists 
the checkpoint data of specific
+ * checkpoints in specific subdirectories. For example, if the base 
directory was set to
+ * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will 
create a subdirectory with
+ * the job's ID that will contain the actual checkpoints:
+ * ({@code 
hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
+ *
+ * Each checkpoint individually will store all its files in a 
subdirectory that includes the
+ * checkpoint number, such as {@code 
hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
+ *
+ * Savepoint Layout
+ *
+ * A savepoint that is set to be stored in path {@code 
hdfs://namenode:port/flink-savepoints/}, will create
+ * a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it 
stores all savepoint data.
+ * The random digits are added as "entropy" to avoid directory collisions.
+ *
+ * Metadata and Success Files
+ *
+ * A completed checkpoint writes its metadata into a file '{@value 
AbstractFsCheckpointStorage#METADATA_FILE_NAME}'.
+ * After that is complete (i.e., the file complete), it writes an 
additional file
+ * '{@value AbstractFsCheckpointStorage#SUCCESS_FILE_NAME}'.
+ *
+ * Ideally that would not be necessary, and one would write the 
metadata file to a temporary file and
+ * then issue a atomic (or at least constant time) rename. But some of the 
file systems (like S3) do
+ * not support that: A rename is a copy process which, when failing, 
leaves corrupt half written
+ * files/objects. The success file is hence needed as a signal that the
+ * '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'file is 
complete.
+ */
+@PublicEvolving
+public abstract class AbstractFileStateBackend extends 
AbstractStateBackend {
+
+   private static final long serialVersionUID = 1L;
+
+   // 

+   //  State Backend Properties
+   // 

+
+   /** The path where checkpoints will be stored, or null, if none has 
been configured. */
+   @Nullable
+   private final Path baseCheckp

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160725400
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
--- End diff --

Another point about naming that I found confusing: Backends have a methods 
to create `CheckpointStorage` and `CheckpointStreamFactory`. From the names, it 
sounds like the storage could also do the job of the stream factories. In fact, 
the storage is dealing more with the metadata aspect on the checkpoint 
coordinator (which we call `Checkpoint`), and the factories are giving as 
streams that are used in checkpointing (so far, so good), but to write the 
snapshots on the task managers.
Maybe the problem is rooted in that checkpoint is some metadata class for 
the JM, as well as some concept - and in the method current names both get 
mixed up.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160716512
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -1185,6 +1153,10 @@ public int 
getNumberOfRetainedSuccessfulCheckpoints() {
}
}
 
+   public CheckpointStorage getCheckpointStorage() {
--- End diff --

Here I noticed how similar the names `CheckpointStorage` and 
`CompletedCheckpointStore` are. Do you think there are names that make their 
difference more obvious when reading the code? For example, the second could 
also go well under `CompletedCheckpoints`.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160721205
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
+
+   /**
+* Checks whether the storage has a default savepoint location 
configured.
+*/
+   boolean hasDefaultSavepointLocation();
+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
+
+   /**
+* Initializes a storage location for new checkpoint with the given ID.
+*
+* The returned storage location can be used to write the checkpoint 
data and metadata
+* to and to obtain the pointers for the location(s) where the actual 
checkpoint data should be
+* stored.
+*
+* @param checkpointId The ID (logical timestamp) of the checkpoint 
that should be persisted.
+* @return A storage location for the data and metadata of the given 
checkpoint.
+*
+* @throws IOException Thrown if the storage location cannot be 
initialized due to an I/O exception.
+*/
+   CheckpointStorageLocation initializeCheckpoint(long checkpointId) 
throws IOException;
--- End diff --

I would consider renaming this to something like 
`initializeStorageLocationForCheckpoint(...)`


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160727075
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -84,13 +84,44 @@
 public interface StateBackend extends java.io.Serializable {
 
// 

-   //  Persistent Bytes Storage
+   //  Checkpoint storage - the durable persistence of checkpoint data
+   // 

+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
--- End diff --

Similar to the `String` vs `StoragePointer`, this could also be a class 
that, essentially , is a `StreamStateHandle` but gives a name to the concept, 
e.g. `CheckpointMetaDataHandle`. But it is not as bad as the other case, 
because this is never passed down very deep (yet).


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160727468
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
+
+   /**
+* Checks whether the storage has a default savepoint location 
configured.
+*/
+   boolean hasDefaultSavepointLocation();
+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
+
+   /**
+* Initializes a storage location for new checkpoint with the given ID.
+*
+* The returned storage location can be used to write the checkpoint 
data and metadata
+* to and to obtain the pointers for the location(s) where the actual 
checkpoint data should be
+* stored.
+*
+* @param checkpointId The ID (logical timestamp) of the checkpoint 
that should be persisted.
+* @return A storage location for the data and metadata of the given 
checkpoint.
+*
+* @throws IOException Thrown if the storage location cannot be 
initialized due to an I/O exception.
+*/
+   CheckpointStorageLocation initializeCheckpoint(long checkpointId) 
throws IOException;
+
+   /**
+* Initializes a storage location for new savepoint with the given ID.
+*
+* If an external location pointer is passed, the savepoint storage 
location
+* will be initialized at the location of that pointer. If the external 
location pointer is null,
+* the default savepoint location will be used. If no default savepoint 
location is configured,
+* this will throw an exception. Whether a default savepoint location 
is configured can be
+* checked via {@link #hasDefaultSavepointLocation()}.
+*
+* @param checkpointId The ID (logical timestamp) of the savepoint's 
checkpoint.
+* @param externalLocationPointer Optionally, a pointer to the location 
where the savepoint should
+*be stored. May be null.
+*
+* @return A storage location for the data and metadata of the 
savepoint.
+*
+* @throws IOException Thrown if the storage location cannot be 
initialized due to an I/O exception.
+*/
+   CheckpointStorageLocation initializeSavepoint(
--- End diff --

Same here about the name.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160644877
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -84,13 +84,44 @@
 public interface StateBackend extends java.io.Serializable {
 
// 

-   //  Persistent Bytes Storage
+   //  Checkpoint storage - the durable persistence of checkpoint data
--- End diff --

The interface `StateBackend` is getting more crowded and more functionality 
is added and all methods are visible to everybody from `CheckpointCoordinator` 
down to the `XYZKeyedStateBackend`. You have already divided the interface into 
3 segments and I suggest to actually split it into 3 parent interfaces, which 
are all implemented by `StateBackend`. In many places, we can just pass by the 
interface of the sub-functionality that is relevant, e.g. 
`XYZKeyedStateBackend` probably only needs to see to part about creating 
streams and should never call things that create a new backend. This would be  
a big step for separation of concerns, to move away from "global visibility". 
Our IDE should be able to automatically figure out all the places where the 
sub-interfaces are sufficient, but as it might be a bigger diff, we could do a 
separate PR for this.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160731923
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
--- End diff --

The purpose of this did not become fully clear for me from the comments and 
the method is also never used. If this is part of some future work, you could 
consider adding the method in a future PR.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160722748
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -475,13 +474,12 @@ public void enableCheckpointing(
checkpointTimeout,
minPauseBetweenCheckpoints,
maxConcurrentCheckpoints,
-   externalizeSettings,
+   retentionPolicy,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
checkpointIDCounter,
checkpointStore,
--- End diff --

Here is one example where the proximity between the classnames 
`CompletedCheckpointStore` and `CheckpointStorage` becomes obvious.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160682007
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+/**
+ *An interface for state backends that pick up additional parameters from 
a configuration.
--- End diff --

Whitespace missing at beginning of comment line.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160722027
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
--- End diff --

I wonder if we can find a name that makes the role of this class w.r.t 
`CheckpointStorageLocation` clearer, in particular that this is what assigns 
the storage locations to the individual checkpoints?


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160719709
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import java.io.IOException;
+
+/**
+ * A storage location for one particular checkpoint. This location is 
typically
+ * created and initialized via {@link 
CheckpointStorage#initializeCheckpoint(long)} or
+ * {@link CheckpointStorage#initializeSavepoint(long, String)}.
+ */
+public interface CheckpointStorageLocation {
+
+   /**
+* Creates the output stream to persist the checkpoint metadata to.
+*
+* @return The output stream to persist the checkpoint metadata to.
+* @throws IOException Thrown, if the stream cannot be opened due to an 
I/O error.
+*/
+   CheckpointStateOutputStream createMetadataOutputStream() throws 
IOException;
+
+   /**
+* Finalizes the checkpoint, marking the location as a finished 
checkpoint.
+* This method returns the external checkpoint pointer that can be used 
to resolve
+* the checkpoint upon recovery.
+*
+* @return The external pointer to the checkpoint at this location.
+* @throws IOException Thrown, if finalizing / marking as finished 
fails due to an I/O error.
+*/
+   String markCheckpointAsFinished() throws IOException;
--- End diff --

As discussed, you might reconsider if this method is really needed.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160730627
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An implementation of durable checkpoint storage to file systems.
+ */
+public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
+
+   private final FileSystem fileSystem;
+
+   private final Path checkpointsDirectory;
+
+   private final Path sharedStateDirectory;
+
+   private final Path taskOwnedStateDirectory;
--- End diff --

As discussed, we might not need this if we map task-owned state to shared 
state. Idea: as long as the task still requires the state, it will resend the 
handle to this file (a placeholder to be precise) and keep the shared 
registry's reference count above zero.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-05 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5248

[FLINK-5823] [checkpoints] State backends now also handle the checkpoint 
metadata

## What is the purpose of the change

This pull requests puts the State Backends in charge of persisting the 
metadata.
This has the following advantages:

  - Checkpoints become conceptually independent of File Systems. We can in 
the future implement state backends purely backed by non-file systems like 
databases or message queues.
  
  - We can simplify or drop the extra code paths implemented for 
externalized checkpoints and checkpoint metadata persisting in HA cases.

  - The checkpoint and savepoint configurations go purely through the state 
backends, making testing much simpler.
  
  - Because the configuration go through the state backends only, it is 
simple to let the state backends pick up a mix of configuration from the 
application code (in code config) and configuration from the cluster setup. For 
example, a programmer can pick the state backend, and the cluster can have 
default limits or checkpoint directories configured. To support that, state 
backends may implement an additional interface which lets them pick up 
configuration values from the cluster configuration.

  - As a followup, this will allow us to implement more efficient ways of 
dropping checkpoint state (recursive directory delete) as well as logic to 
scavenge left-over checkpoint data.

## Altered user-facing Behavior

  - All checkpoints are always "externalized", meaning that the metadata is 
always persisted. The notion of externalized checkpoints is dropped.
  
  - Checkpoints have no "externalization setting", but a **retention 
policy**, like
 - `RETAIN_ON_CANCELLATION`: Keep checkpoints when user manually 
cancels job, similar as the corresponding setting for externalized checkpoints
 - `RETAIN_ON_FAILURE`: Retain when the job reaches a terminal failure. 
For compatibility, this is automatically picked when the user calls the now 
deprecated method to activate externalized checkpoints.
 - `NEVER_RETAIN_AFTER_TERMINATION`: Conceptually similar to the 
behavior when no externalized checkpoints were configured.

  - The `MemoryStateBackend` is viewed as a FileSystem-based State Backend 
that does not create separate files for state, but just holds state inline with 
the checkpoint metadata. In the Metadata and Savepoint handling, there is no 
distinction between the `MemoryStateBackend` and the `FsStateBackend`.
 - As a special case, the MemoryStateBackend may choose to not durably 
persist the metadata (when no storage location is configured, by default), in 
which case it will not be able to support an HA mode (there is an eager check 
for that). That is merely there to support no-config getting started 
experiences and simpler in-IDE development setups.

## Followup work

To make sure there is no extra persisting of the checkpoint metadata by the 
HA store (it simply references the regular persisted checkpoint metadata) we 
need some changes to the `ZooKeeperCompletedCheckpointStore`.

Once we start storing shared checkpoint state (incremental checkpoints) and 
task-owned state (write-ahead sinks) in different locations, we can start 
optimizing checkpoint directory cleanup, and can start implementing scavengers 
for left-over state.

## Brief change log

  - The state backends introduce the concept of a `CheckpointStorage` 
(storage of bytes) and `CheckpointStorageLocation` (specific location for the 
bytes of a checkpoint/savepoint). That makes the separation of concerns in the 
state backend clear: `KeyedStateBackend` and `OperatorStatebackend` define how 
to hold and checkpoint the state, while `CheckpointStorage` defines how to 
persist bytes (data and metadata).
  
  - The `CheckpointStorage` is responsible for storing the checkpoint 
metadata. There is no implicit assumption that the checkpoint metadata is 
stored in a file systems any more.
  
  - All checkpoint directory / savepoint directory specific config settings 
are now part of the state backends. The Checkpoint Coordinator simply calls the 
relevant methods on the state backends to store metadata.

  - All checkpoints are addressable via a "pointer", which is interpreted 
by the state backend to find the checkpoint. For File-system based 
statebackends (all statebackends in Flink currently), this pointer is the file 
path.
  
## Verifying this change

This change adds and adjusts many existing tests to verify the behavior.

Manual verification can happen by just starting a regular Flink cluster, 
enabling checkpoints, and seeing that metadata files get persisted always.

## Does this pull request potentially affect one of the following parts:

  -