[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2018-01-18 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/4907


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-26 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r147121427
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
@@ -243,11 +245,19 @@ else if (directory.exists()) {
 * @throws IOException if the delete operation fails
 */
public static boolean deletePathIfEmpty(FileSystem fileSystem, Path 
path) throws IOException {
-   FileStatus[] fileStatuses = null;
+   final FileStatus[] fileStatuses;
 
try {
fileStatuses = fileSystem.listStatus(path);
-   } catch (Exception ignored) {}
+   }
+   catch (FileNotFoundException e) {
+   // path already deleted
+   return true;
+   }
+   catch (Exception e) {
+   // could not access directory, cannot delete
--- End diff --

Make sense. Thanks for the explanation!


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-26 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r147093502
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+   // 

+   //  general checkpoint and state backend options
+   // 

+
+   public static final ConfigOption STATE_BACKEND = ConfigOptions
+   .key("state.backend")
+   .noDefaultValue();
+
+   /** The maximum number of completed checkpoint instances to retain.*/
+   public static final ConfigOption MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+   .key("state.checkpoints.num-retained")
+   .defaultValue(1);
+
+   // 

+   //  Options specific to the file-system-based state backends
+   // 

+
+   /** The default directory for savepoints. Used by the state backends 
that write
+* savepoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption SAVEPOINT_DIRECTORY = 
ConfigOptions
+   .key("state.savepoints.dir")
+   .noDefaultValue()
+   .withDeprecatedKeys("savepoints.state.backend.fs.dir");
+
+   /** The default directory used for checkpoints. Used by the state 
backends that write
+* checkpoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption CHECKPOINTS_DIRECTORY = 
ConfigOptions
+   .key("state.checkpoints.dir")
+   .noDefaultValue();
+
+   /** Option whether the heap-based key/value data structures should use 
an asynchronous
+* snapshot method. Used by MemoryStateBackend and FsStateBackend. */
+   public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = 
ConfigOptions
+   .key("state.backend.heap.async")
--- End diff --

Yes, I know. I would like to do that as a follow-up once you're done, time 
permitting.


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r147092074
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
@@ -243,11 +245,19 @@ else if (directory.exists()) {
 * @throws IOException if the delete operation fails
 */
public static boolean deletePathIfEmpty(FileSystem fileSystem, Path 
path) throws IOException {
-   FileStatus[] fileStatuses = null;
+   final FileStatus[] fileStatuses;
 
try {
fileStatuses = fileSystem.listStatus(path);
-   } catch (Exception ignored) {}
+   }
+   catch (FileNotFoundException e) {
+   // path already deleted
+   return true;
+   }
+   catch (Exception e) {
+   // could not access directory, cannot delete
--- End diff --

I have heard often that logging in utility functions is usually not 
advised, because the utility cannot decide whether it is actually a problem or 
not. Logging in utilities often leads to hard to log pollution.

Logging should happen in the caller of the utility, which can interpret the 
context and whether it is actually a problem. If the caller calls a utility 
that does not report problems properly (a silent utility function), then its 
the wrong utility for the caller.


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r147091056
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
 ---
@@ -18,45 +18,41 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateBackendFactory;
 
-import java.io.IOException;
+import java.net.URI;
 
 /**
- * A factory that creates an {@link 
org.apache.flink.runtime.state.filesystem.FsStateBackend}
- * from a configuration.
+ * A factory that creates an {@link FsStateBackend} from a configuration.
  */
+@PublicEvolving
 public class FsStateBackendFactory implements 
StateBackendFactory {
-   
-   /** The key under which the config stores the directory where 
checkpoints should be stored */
-   public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = 
"state.backend.fs.checkpointdir";
-
-   /** The key under which the config stores the threshold for state to be 
store in memory,
-* rather than in files */
-   public static final String MEMORY_THRESHOLD_CONF_KEY = 
"state.backend.fs.memory-threshold";
-
 
@Override
public FsStateBackend createFromConfig(Configuration config) throws 
IllegalConfigurationException {
--- End diff --

One could for those that we currently have, agreed. I would like to keep 
the factories, though, for future backends (or user-defined backends) that do 
not implement reconfiguration.


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r147090655
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -18,54 +18,92 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.URI;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The file state backend is a state backend that stores the state of 
streaming jobs in a file system.
+ * This state backend holds the working state in the memory (JVM heap) of 
the TaskManagers.
+ * The state backend checkpoints state as files to a file system (hence 
the backend's name).
+ *
+ * Each checkpoint individually will store all its files in a 
subdirectory that includes the
+ * checkpoint number, such as {@code 
hdfs://namenode:port/flink-checkpoints/chk-17/}.
+ *
+ * State Size Considerations
+ *
+ * Working state is kept on the TaskManager heap. If a TaskManager 
executes multiple
+ * tasks concurrently (if the TaskManager has multiple slots, or if 
slot-sharing is used)
+ * then the aggregate state of all tasks needs to fit into that 
TaskManager's memory.
+ *
+ * This state backend stores small state chunks directly with the 
metadata, to avoid creating
+ * many small files. The threshold for that is configurable. When 
increasing this threshold, the
+ * size of the checkpoint metadata increases. The checkpoint metadata of 
all retained completed
+ * checkpoints needs to fit into the JobManager's heap memory. This is 
typically not a problem,
+ * unless the threshold {@link #getMinFileSizeThreshold()} is increased 
significantly.
+ *
+ * Persistence Guarantees
  *
- * The state backend has one core directory into which it puts all 
checkpoint data. Inside that
- * directory, it creates a directory per job, inside which each checkpoint 
gets a directory, with
- * files for each state, for example:
+ * Checkpoints from this state backend are as persistent and available as 
filesystem that is written to.
+ * If the file system is a persistent distributed file system, this state 
backend supports
+ * highly available setups. The backend additionally supports savepoints 
and externalized checkpoints.
  *
- * {@code 
hdfs://namenode:port/flink-checkpoints//chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8
 }
+ * Configuration
+ *
+ * As for all state backends, this backend can either be configured within 
the application (by creating
+ * the backend with the respective constructor parameters and setting it 
on the execution environment)
+ * or by specifying it in the Flink configuration.
+ *
+ * If the state backend was specified in the application, it may pick 
up additional configuration
+ * parameters from the Flink configuration. For example, if the backend if 
configured in the application
+ * without a default savepoint directory, it will pick up a default 
savepoint directory specified in the
+ * Flink configuration of the running job/cluster. That behavior is 
implemented via the
+ * {@link #configure(Configuration)} method.
  */
-public class FsStateBackend extends AbstractStateBackend {
+@PublicEvolving
+public class FsStateBackend extends AbstractFileStateBackend implements 
ConfigurableStateBackend {
--- End diff --

Could be, but the method would be empty in `AbstractFileStateBackend`, 
because the reconfiguration is implemented via a special constructor, and 
constructors don't inherit like 

[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r147090207
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.net.URI;
+
+/**
+ * A base class for all state backends that store their metadata (and 
data) in files.
+ * Examples that inherit from this are the {@link FsStateBackend}, the
+ * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend 
MemoryStateBackend}, or the
--- End diff --

The main difference between `FsStatebackend` and `MemoryStateBackend` is 
the following (already now as well, BTW):
  - `FsStateBackend` writes directly individual files for individual state 
chunks and always writes out metadata to a file.
  - `MemoryStateBackend` aggregates all state (data and metadata) in a 
single file. It only writes that file of a checkpoint directory is configured, 
or if HA is activated, otherwise it just keeps it in the JobManager's memory. 
Thats so that one can start playing around without any checkpoint config.


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r147089435
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+/**
+ *An interface for state backends that pick up additional parameters from 
a configuration.
+ */
+public interface ConfigurableStateBackend {
+
+   /**
+* Creates a variant of the state backend that applies additional 
configuration parameters.
+*
+* Settings that were directly done on the original state backend 
object in the application
+* program typically have precedence over setting picked up from the 
configuration.
+*
+* If no configuration is applied, or if the method directly applies 
configuration values to
+* the (mutable) state backend object, this method may return the 
original state backend object.
+* Otherwise it typically returns a modified copy.
+*
+* @param config The configuration to pick the values from. 
+* @return A copy of th
--- End diff --

Will fix that...


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r147089325
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 ---
@@ -229,29 +229,31 @@ public static ExecutionGraph buildGraph(
metrics);
 
// The default directory for externalized checkpoints
-   String externalizedCheckpointsDir = 
jobManagerConfig.getString(CoreOptions.CHECKPOINTS_DIRECTORY);
+   String externalizedCheckpointsDir = 
jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
 
-   // load the state backend for checkpoint metadata.
-   // if specified in the application, use from there, 
otherwise load from configuration
-   final StateBackend metadataBackend;
+   // load the state backend from the application settings
+   final StateBackend applicationConfiguredBackend;
+   final SerializedValue 
serializedAppConfigured = snapshotSettings.getDefaultStateBackend();
 
-   final SerializedValue 
applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
-   if (applicationConfiguredBackend != null) {
+   if (serializedAppConfigured == null) {
+   applicationConfiguredBackend = null;
+   }
+   else {
try {
-   metadataBackend = 
applicationConfiguredBackend.deserializeValue(classLoader);
+   applicationConfiguredBackend = 
serializedAppConfigured.deserializeValue(classLoader);
} catch (IOException | ClassNotFoundException 
e) {
-   throw new JobExecutionException(jobId, 
"Could not instantiate configured state backend.", e);
+   throw new JobExecutionException(jobId, 
+   "Could not deserialize 
application-defined state backend.", e);
}
+   }
 
-   log.info("Using application-defined state 
backend for checkpoint/savepoint metadata: {}.",
-   metadataBackend);
-   } else {
-   try {
-   metadataBackend = AbstractStateBackend
-   
.loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log);
-   } catch (IllegalConfigurationException | 
IOException | DynamicCodeLoadingException e) {
-   throw new JobExecutionException(jobId, 
"Could not instantiate configured state backend", e);
-   }
+   final StateBackend rootBackend;
+   try {
+   rootBackend = 
StateBackendLoader.fromApplicationOrConfigOrDefault(
--- End diff --

True, commits are not self contained. I tries as much, but sich it comes 
all from a single original commit, it would have been crazy time intensive to 
make every commit self contained.


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r147089150
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -116,6 +117,10 @@
 * accessing this don't block the job manager actor and run 
asynchronously. */
private final CompletedCheckpointStore completedCheckpointStore;
 
+   /** The root checkpoint state backend, which is responsible for 
initializing the
+* checkpoint, storing the metadata, and cleaning up the checkpoint */
+   private final StateBackend checkpointStateBackend;
--- End diff --

No, not really. Wanted to pick a name here that allows us later to possibly 
have different "backends" for the data structure, meaning heap and RocksDB in 
different operators. One state backend needs to be the root that defines how 
metadata is checkpointed and recovered.


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r147088900
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+   // 

+   //  general checkpoint and state backend options
+   // 

+
+   public static final ConfigOption STATE_BACKEND = ConfigOptions
+   .key("state.backend")
+   .noDefaultValue();
+
+   /** The maximum number of completed checkpoint instances to retain.*/
+   public static final ConfigOption MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+   .key("state.checkpoints.num-retained")
+   .defaultValue(1);
+
+   // 

+   //  Options specific to the file-system-based state backends
+   // 

+
+   /** The default directory for savepoints. Used by the state backends 
that write
+* savepoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption SAVEPOINT_DIRECTORY = 
ConfigOptions
+   .key("state.savepoints.dir")
+   .noDefaultValue()
+   .withDeprecatedKeys("savepoints.state.backend.fs.dir");
+
+   /** The default directory used for checkpoints. Used by the state 
backends that write
+* checkpoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption CHECKPOINTS_DIRECTORY = 
ConfigOptions
+   .key("state.checkpoints.dir")
+   .noDefaultValue();
+
+   /** Option whether the heap-based key/value data structures should use 
an asynchronous
+* snapshot method. Used by MemoryStateBackend and FsStateBackend. */
+   public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = 
ConfigOptions
+   .key("state.backend.heap.async")
--- End diff --

+1 to fix the names, separate effort though


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-26 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r147088825
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+   // 

+   //  general checkpoint and state backend options
+   // 

+
+   public static final ConfigOption STATE_BACKEND = ConfigOptions
+   .key("state.backend")
+   .noDefaultValue();
+
+   /** The maximum number of completed checkpoint instances to retain.*/
+   public static final ConfigOption MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+   .key("state.checkpoints.num-retained")
+   .defaultValue(1);
+
+   // 

+   //  Options specific to the file-system-based state backends
+   // 

+
+   /** The default directory for savepoints. Used by the state backends 
that write
+* savepoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
--- End diff --

Yes, the `MemoryStateBackend` actually works for HA and Savepoints. The 
name is misleading...


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r147031787
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
@@ -243,11 +245,19 @@ else if (directory.exists()) {
 * @throws IOException if the delete operation fails
 */
public static boolean deletePathIfEmpty(FileSystem fileSystem, Path 
path) throws IOException {
-   FileStatus[] fileStatuses = null;
+   final FileStatus[] fileStatuses;
 
try {
fileStatuses = fileSystem.listStatus(path);
-   } catch (Exception ignored) {}
+   }
+   catch (FileNotFoundException e) {
+   // path already deleted
+   return true;
+   }
+   catch (Exception e) {
+   // could not access directory, cannot delete
--- End diff --

should we log a warning here?


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146993120
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
 ---
@@ -48,75 +53,249 @@
 
private final ClassLoader cl = getClass().getClassLoader();
 
-   private final String backendKey = CoreOptions.STATE_BACKEND.key();
+   private final String backendKey = 
CheckpointingOptions.STATE_BACKEND.key();
 
// 

+   //  defaults
+   // 

 
@Test
public void testNoStateBackendDefined() throws Exception {
-   assertNull(AbstractStateBackend.loadStateBackendFromConfig(new 
Configuration(), cl, null));
+   assertNull(StateBackendLoader.loadStateBackendFromConfig(new 
Configuration(), cl, null));
}
 
@Test
public void testInstantiateMemoryBackendByDefault() throws Exception {
-   StateBackend backend = AbstractStateBackend
-   .loadStateBackendFromConfigOrCreateDefault(new 
Configuration(), cl, null);
+   StateBackend backend =
+   
StateBackendLoader.fromApplicationOrConfigOrDefault(null, new Configuration(), 
cl, null);
 
assertTrue(backend instanceof MemoryStateBackend);
}
 
@Test
-   public void testLoadMemoryStateBackend() throws Exception {
-   // we configure with the explicit string (rather than 
AbstractStateBackend#X_STATE_BACKEND_NAME)
-   // to guard against config-breaking changes of the name 
+   public void testApplicationDefinedHasPrecedence() throws Exception {
+   final StateBackend appBackend = 
Mockito.mock(StateBackend.class);
+
final Configuration config = new Configuration();
config.setString(backendKey, "jobmanager");
 
-   StateBackend backend = AbstractStateBackend
-   .loadStateBackendFromConfigOrCreateDefault(new 
Configuration(), cl, null);
+   StateBackend backend = 
StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config, cl, 
null);
+   assertEquals(appBackend, backend);
+   }
 
-   assertTrue(backend instanceof MemoryStateBackend);
+   // 

+   //  Memory State Backend
+   // 

+
+   /**
+* Validates loading a memory state backend from the cluster 
configuration.
+*/
+   @Test
+   public void testLoadMemoryStateBackendNoParameters() throws Exception {
+   // we configure with the explicit string (rather than 
AbstractStateBackend#X_STATE_BACKEND_NAME)
+   // to guard against config-breaking changes of the name
+
+   final Configuration config1 = new Configuration();
+   config1.setString(backendKey, "jobmanager");
+
+   final Configuration config2 = new Configuration();
+   config2.setString(backendKey, 
MemoryStateBackendFactory.class.getName());
+
+   StateBackend backend1 = 
StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
+   StateBackend backend2 = 
StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
+
+   assertTrue(backend1 instanceof MemoryStateBackend);
+   assertTrue(backend2 instanceof MemoryStateBackend);
+   }
+
+   /**
+* Validates loading a memory state backend with additional parameters 
from the cluster configuration.
+*/
+   @Test
+   public void testLoadMemoryStateWithParameters() throws Exception {
+   final String checkpointDir = new 
Path(tmp.newFolder().toURI()).toString();
+   final String savepointDir = new 
Path(tmp.newFolder().toURI()).toString();
+   final Path expectedCheckpointPath = new Path(checkpointDir);
+   final Path expectedSavepointPath = new Path(savepointDir);
+
+   // we configure with the explicit string (rather than 
AbstractStateBackend#X_STATE_BACKEND_NAME)
+   // to guard against config-breaking changes of the name
+
+   final Configuration config1 = new Configuration();
+   config1.setString(backendKey, "jobmanager");
+   config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir);
+   config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir);
+
+   final Configuration config2 = new Configuration();
+   config2.setString(backendKey, 

[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146990927
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.net.URI;
+
+/**
+ * A base class for all state backends that store their metadata (and 
data) in files.
+ * Examples that inherit from this are the {@link FsStateBackend}, the
+ * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend 
MemoryStateBackend}, or the
--- End diff --

Nit: is this true for `MemoryStateBackend`? At the very least it's weird. 
😅 


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146991686
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -18,54 +18,92 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.URI;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The file state backend is a state backend that stores the state of 
streaming jobs in a file system.
+ * This state backend holds the working state in the memory (JVM heap) of 
the TaskManagers.
+ * The state backend checkpoints state as files to a file system (hence 
the backend's name).
+ *
+ * Each checkpoint individually will store all its files in a 
subdirectory that includes the
+ * checkpoint number, such as {@code 
hdfs://namenode:port/flink-checkpoints/chk-17/}.
+ *
+ * State Size Considerations
+ *
+ * Working state is kept on the TaskManager heap. If a TaskManager 
executes multiple
+ * tasks concurrently (if the TaskManager has multiple slots, or if 
slot-sharing is used)
+ * then the aggregate state of all tasks needs to fit into that 
TaskManager's memory.
+ *
+ * This state backend stores small state chunks directly with the 
metadata, to avoid creating
+ * many small files. The threshold for that is configurable. When 
increasing this threshold, the
+ * size of the checkpoint metadata increases. The checkpoint metadata of 
all retained completed
+ * checkpoints needs to fit into the JobManager's heap memory. This is 
typically not a problem,
+ * unless the threshold {@link #getMinFileSizeThreshold()} is increased 
significantly.
+ *
+ * Persistence Guarantees
  *
- * The state backend has one core directory into which it puts all 
checkpoint data. Inside that
- * directory, it creates a directory per job, inside which each checkpoint 
gets a directory, with
- * files for each state, for example:
+ * Checkpoints from this state backend are as persistent and available as 
filesystem that is written to.
+ * If the file system is a persistent distributed file system, this state 
backend supports
+ * highly available setups. The backend additionally supports savepoints 
and externalized checkpoints.
  *
- * {@code 
hdfs://namenode:port/flink-checkpoints//chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8
 }
+ * Configuration
+ *
+ * As for all state backends, this backend can either be configured within 
the application (by creating
+ * the backend with the respective constructor parameters and setting it 
on the execution environment)
+ * or by specifying it in the Flink configuration.
+ *
+ * If the state backend was specified in the application, it may pick 
up additional configuration
+ * parameters from the Flink configuration. For example, if the backend if 
configured in the application
+ * without a default savepoint directory, it will pick up a default 
savepoint directory specified in the
+ * Flink configuration of the running job/cluster. That behavior is 
implemented via the
+ * {@link #configure(Configuration)} method.
  */
-public class FsStateBackend extends AbstractStateBackend {
+@PublicEvolving
+public class FsStateBackend extends AbstractFileStateBackend implements 
ConfigurableStateBackend {
--- End diff --

Why isn't `AbstractFileStateBackend` configurable?


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146992430
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
 ---
@@ -18,45 +18,41 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateBackendFactory;
 
-import java.io.IOException;
+import java.net.URI;
 
 /**
- * A factory that creates an {@link 
org.apache.flink.runtime.state.filesystem.FsStateBackend}
- * from a configuration.
+ * A factory that creates an {@link FsStateBackend} from a configuration.
  */
+@PublicEvolving
 public class FsStateBackendFactory implements 
StateBackendFactory {
-   
-   /** The key under which the config stores the directory where 
checkpoints should be stored */
-   public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = 
"state.backend.fs.checkpointdir";
-
-   /** The key under which the config stores the threshold for state to be 
store in memory,
-* rather than in files */
-   public static final String MEMORY_THRESHOLD_CONF_KEY = 
"state.backend.fs.memory-threshold";
-
 
@Override
public FsStateBackend createFromConfig(Configuration config) throws 
IllegalConfigurationException {
--- End diff --

Wouldn't it make sense to do all of this in `configure()`? The factories 
are essentially useless now and would only instantiate the backend and then 
call `configure()` with the config.


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146985691
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+   // 

+   //  general checkpoint and state backend options
+   // 

+
+   public static final ConfigOption STATE_BACKEND = ConfigOptions
+   .key("state.backend")
+   .noDefaultValue();
+
+   /** The maximum number of completed checkpoint instances to retain.*/
+   public static final ConfigOption MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+   .key("state.checkpoints.num-retained")
+   .defaultValue(1);
+
+   // 

+   //  Options specific to the file-system-based state backends
+   // 

+
+   /** The default directory for savepoints. Used by the state backends 
that write
+* savepoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption SAVEPOINT_DIRECTORY = 
ConfigOptions
+   .key("state.savepoints.dir")
+   .noDefaultValue()
+   .withDeprecatedKeys("savepoints.state.backend.fs.dir");
+
+   /** The default directory used for checkpoints. Used by the state 
backends that write
+* checkpoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption CHECKPOINTS_DIRECTORY = 
ConfigOptions
+   .key("state.checkpoints.dir")
+   .noDefaultValue();
+
+   /** Option whether the heap-based key/value data structures should use 
an asynchronous
+* snapshot method. Used by MemoryStateBackend and FsStateBackend. */
+   public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = 
ConfigOptions
+   .key("state.backend.heap.async")
--- End diff --

Off-topic: The fact that we have "FsStateBackend" and "MemoryStateBackend" 
is confusing every user. We should only have "HeapStateBackend" and 
"RocksDBStateBackend", which both checkpoint to a DFS. (And the current 
"MemoryStateBackend" behaviour could be a switch on "HeapStateBackend"). But 
I'm afraid it's too late for that. 😩 


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146987131
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 ---
@@ -229,29 +229,31 @@ public static ExecutionGraph buildGraph(
metrics);
 
// The default directory for externalized checkpoints
-   String externalizedCheckpointsDir = 
jobManagerConfig.getString(CoreOptions.CHECKPOINTS_DIRECTORY);
+   String externalizedCheckpointsDir = 
jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
 
-   // load the state backend for checkpoint metadata.
-   // if specified in the application, use from there, 
otherwise load from configuration
-   final StateBackend metadataBackend;
+   // load the state backend from the application settings
+   final StateBackend applicationConfiguredBackend;
+   final SerializedValue 
serializedAppConfigured = snapshotSettings.getDefaultStateBackend();
 
-   final SerializedValue 
applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
-   if (applicationConfiguredBackend != null) {
+   if (serializedAppConfigured == null) {
+   applicationConfiguredBackend = null;
+   }
+   else {
try {
-   metadataBackend = 
applicationConfiguredBackend.deserializeValue(classLoader);
+   applicationConfiguredBackend = 
serializedAppConfigured.deserializeValue(classLoader);
} catch (IOException | ClassNotFoundException 
e) {
-   throw new JobExecutionException(jobId, 
"Could not instantiate configured state backend.", e);
+   throw new JobExecutionException(jobId, 
+   "Could not deserialize 
application-defined state backend.", e);
}
+   }
 
-   log.info("Using application-defined state 
backend for checkpoint/savepoint metadata: {}.",
-   metadataBackend);
-   } else {
-   try {
-   metadataBackend = AbstractStateBackend
-   
.loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log);
-   } catch (IllegalConfigurationException | 
IOException | DynamicCodeLoadingException e) {
-   throw new JobExecutionException(jobId, 
"Could not instantiate configured state backend", e);
-   }
+   final StateBackend rootBackend;
+   try {
+   rootBackend = 
StateBackendLoader.fromApplicationOrConfigOrDefault(
--- End diff --

nit: This is only defined in the later commit "[FLINK-5823] [checkpoints] 
State backends define checkpoint and savepoint directories, improved 
configuration", so technically the commit that adds this code is broken.


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146984727
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+   // 

+   //  general checkpoint and state backend options
+   // 

+
+   public static final ConfigOption STATE_BACKEND = ConfigOptions
+   .key("state.backend")
+   .noDefaultValue();
+
+   /** The maximum number of completed checkpoint instances to retain.*/
+   public static final ConfigOption MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+   .key("state.checkpoints.num-retained")
+   .defaultValue(1);
+
+   // 

+   //  Options specific to the file-system-based state backends
+   // 

+
+   /** The default directory for savepoints. Used by the state backends 
that write
+* savepoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
--- End diff --

nit: is this true for `MemoryStateBackend`? Same for the options below.


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146987476
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -116,6 +117,10 @@
 * accessing this don't block the job manager actor and run 
asynchronously. */
private final CompletedCheckpointStore completedCheckpointStore;
 
+   /** The root checkpoint state backend, which is responsible for 
initializing the
+* checkpoint, storing the metadata, and cleaning up the checkpoint */
+   private final StateBackend checkpointStateBackend;
--- End diff --

nit: Are there other backends than "checkpoint" backend?


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146988381
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+/**
+ *An interface for state backends that pick up additional parameters from 
a configuration.
+ */
+public interface ConfigurableStateBackend {
+
+   /**
+* Creates a variant of the state backend that applies additional 
configuration parameters.
+*
+* Settings that were directly done on the original state backend 
object in the application
+* program typically have precedence over setting picked up from the 
configuration.
+*
+* If no configuration is applied, or if the method directly applies 
configuration values to
+* the (mutable) state backend object, this method may return the 
original state backend object.
+* Otherwise it typically returns a modified copy.
+*
+* @param config The configuration to pick the values from. 
+* @return A copy of th
--- End diff --

Sentence is trailing off.


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146985767
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+   // 

+   //  general checkpoint and state backend options
+   // 

+
+   public static final ConfigOption STATE_BACKEND = ConfigOptions
+   .key("state.backend")
+   .noDefaultValue();
+
+   /** The maximum number of completed checkpoint instances to retain.*/
+   public static final ConfigOption MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+   .key("state.checkpoints.num-retained")
+   .defaultValue(1);
+
+   // 

+   //  Options specific to the file-system-based state backends
+   // 

+
+   /** The default directory for savepoints. Used by the state backends 
that write
+* savepoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption SAVEPOINT_DIRECTORY = 
ConfigOptions
+   .key("state.savepoints.dir")
+   .noDefaultValue()
+   .withDeprecatedKeys("savepoints.state.backend.fs.dir");
+
+   /** The default directory used for checkpoints. Used by the state 
backends that write
+* checkpoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption CHECKPOINTS_DIRECTORY = 
ConfigOptions
+   .key("state.checkpoints.dir")
+   .noDefaultValue();
+
+   /** Option whether the heap-based key/value data structures should use 
an asynchronous
+* snapshot method. Used by MemoryStateBackend and FsStateBackend. */
+   public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = 
ConfigOptions
+   .key("state.backend.heap.async")
--- End diff --

Mentioning it because we have no "heap" backend, technically.


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/4907

[FLINK-5823] [checkpoints] State Backends also handle Checkpoint Metadata 
(part 1)

This is an incremental (first part) rebuild of #3522 on the latest master.

For ease of review, broken down into small chunks.

## Part 1: Application-defined State Backends pick up additional values 
from the configuration

We need to keep supporting the scenario of setting a state backends in the 
user program, but configuring parameters like checkpoint directory in the 
cluster config. To support that, state backends may implement an additional 
interface which lets them pick up configuration values from the cluster 
configuration.

This also makes testing the checkpoint / savepoint configuration much 
easier.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink backend

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4907.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4907


commit 8055cc9d84e206abd00cbc339c6cc390c53f2afe
Author: Stephan Ewen 
Date:   2017-10-24T17:46:07Z

[hotfix] [hdfs] Avoid reparsing URIs in Hadoop File Status conversion

commit 96068b93d4c332a9b8cfba06ecc9d42804a79f92
Author: Stephan Ewen 
Date:   2017-10-24T19:20:44Z

[hotfix] [streaming] Move SerializedCheckpointData to proper scope for 
MessageAcknowledgingSourceBase

commit e098cc6ac8d77a027aefb6c1bcee41f6214ed74c
Author: Stephan Ewen 
Date:   2017-10-25T09:56:59Z

[hotfix] [runtime] Minor optimization in CheckpointMetrics

commit 8a6b78a8dacadb5476c6bf452fbdab352d0ab908
Author: Stephan Ewen 
Date:   2017-10-25T11:30:51Z

[hotfix] [checkpoints] fix warnings and make minor improvements to 
CheckpointCoordinatorTest and SharedStateRegistry

commit aa09d82fd57d2636f54ec2a56e457f19d8ec9490
Author: Stephan Ewen 
Date:   2017-10-25T11:46:34Z

[hotfix] [core] Fix FileUtils.deletePathIfEmpty

commit 15be2602d4d44e29e756cd558130f2303d82fe8b
Author: Stephan Ewen 
Date:   2017-10-25T17:04:58Z

[hotfix] [checkpoints] Remove incorrect 'Serializable' from 
StateBackendFactory

commit 1438df297bd820c07787563ddcfd2fda8773387f
Author: Stephan Ewen 
Date:   2017-10-25T12:16:37Z

[FLINK-7924] [checkpoints] Fix incorrect names of checkpoint options

Checkpoint options are incorrectly always called 'FULL_CHECKPOINT' when 
actually,
the checkpoints may always be incremental and only savepoints have to be 
full
and self contained.

Initially, we planned to add options for multiple checkpoints, like 
checkpoints
that were foreced to be full, and checkpoints that were incremental. That
is not necessary at this point.

commit 6b792bf72fdb0c17625b1fe97c52791d550a74e4
Author: Stephan Ewen 
Date:   2017-10-25T15:30:14Z

[FLINK-7925] [checkpoints] Add CheckpointingOptions

The CheckpointingOptions consolidate all checkpointing and state 
backend-related
settings that were previously split across different classes.

commit c4ce5522d8052ae8af8134bac4ea74bb5a929027
Author: Stephan Ewen 
Date:   2017-10-25T11:23:46Z

[FLINK-5823] [checkpoints] Pass state backend to checkpoint coordinator

commit 337e354bbe198ce6ab68d23cec93bc0c81bbbfaf
Author: Stephan Ewen 
Date:   2017-10-25T15:32:17Z

[hotfix] [core] Fix broken JavaDoc links in ConfigConstants

commit 0522ac34146aecfad2dbd9c806e6f7be182d00fd
Author: Stephan Ewen 
Date:   2017-10-25T17:04:10Z

[FLINK-5823] [checkpoints] State backends define checkpoint and savepoint 
directories, improved configuration




---