[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/4907 ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r147121427 --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java --- @@ -243,11 +245,19 @@ else if (directory.exists()) { * @throws IOException if the delete operation fails */ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws IOException { - FileStatus[] fileStatuses = null; + final FileStatus[] fileStatuses; try { fileStatuses = fileSystem.listStatus(path); - } catch (Exception ignored) {} + } + catch (FileNotFoundException e) { + // path already deleted + return true; + } + catch (Exception e) { + // could not access directory, cannot delete --- End diff -- Make sense. Thanks for the explanation! ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r147093502 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A collection of all configuration options that relate to checkpoints + * and savepoints. + */ +public class CheckpointingOptions { + + // + // general checkpoint and state backend options + // + + public static final ConfigOption STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") + .defaultValue(1); + + // + // Options specific to the file-system-based state backends + // + + /** The default directory for savepoints. Used by the state backends that write +* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption SAVEPOINT_DIRECTORY = ConfigOptions + .key("state.savepoints.dir") + .noDefaultValue() + .withDeprecatedKeys("savepoints.state.backend.fs.dir"); + + /** The default directory used for checkpoints. Used by the state backends that write +* checkpoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption CHECKPOINTS_DIRECTORY = ConfigOptions + .key("state.checkpoints.dir") + .noDefaultValue(); + + /** Option whether the heap-based key/value data structures should use an asynchronous +* snapshot method. Used by MemoryStateBackend and FsStateBackend. */ + public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = ConfigOptions + .key("state.backend.heap.async") --- End diff -- Yes, I know. I would like to do that as a follow-up once you're done, time permitting. ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r147092074 --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java --- @@ -243,11 +245,19 @@ else if (directory.exists()) { * @throws IOException if the delete operation fails */ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws IOException { - FileStatus[] fileStatuses = null; + final FileStatus[] fileStatuses; try { fileStatuses = fileSystem.listStatus(path); - } catch (Exception ignored) {} + } + catch (FileNotFoundException e) { + // path already deleted + return true; + } + catch (Exception e) { + // could not access directory, cannot delete --- End diff -- I have heard often that logging in utility functions is usually not advised, because the utility cannot decide whether it is actually a problem or not. Logging in utilities often leads to hard to log pollution. Logging should happen in the caller of the utility, which can interpret the context and whether it is actually a problem. If the caller calls a utility that does not report problems properly (a silent utility function), then its the wrong utility for the caller. ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r147091056 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java --- @@ -18,45 +18,41 @@ package org.apache.flink.runtime.state.filesystem; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateBackendFactory; -import java.io.IOException; +import java.net.URI; /** - * A factory that creates an {@link org.apache.flink.runtime.state.filesystem.FsStateBackend} - * from a configuration. + * A factory that creates an {@link FsStateBackend} from a configuration. */ +@PublicEvolving public class FsStateBackendFactory implements StateBackendFactory { - - /** The key under which the config stores the directory where checkpoints should be stored */ - public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir"; - - /** The key under which the config stores the threshold for state to be store in memory, -* rather than in files */ - public static final String MEMORY_THRESHOLD_CONF_KEY = "state.backend.fs.memory-threshold"; - @Override public FsStateBackend createFromConfig(Configuration config) throws IllegalConfigurationException { --- End diff -- One could for those that we currently have, agreed. I would like to keep the factories, though, for future backends (or user-defined backends) that do not implement reconfiguration. ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r147090655 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java --- @@ -18,54 +18,92 @@ package org.apache.flink.runtime.state.filesystem; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import java.io.IOException; import java.net.URI; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The file state backend is a state backend that stores the state of streaming jobs in a file system. + * This state backend holds the working state in the memory (JVM heap) of the TaskManagers. + * The state backend checkpoints state as files to a file system (hence the backend's name). + * + * Each checkpoint individually will store all its files in a subdirectory that includes the + * checkpoint number, such as {@code hdfs://namenode:port/flink-checkpoints/chk-17/}. + * + * State Size Considerations + * + * Working state is kept on the TaskManager heap. If a TaskManager executes multiple + * tasks concurrently (if the TaskManager has multiple slots, or if slot-sharing is used) + * then the aggregate state of all tasks needs to fit into that TaskManager's memory. + * + * This state backend stores small state chunks directly with the metadata, to avoid creating + * many small files. The threshold for that is configurable. When increasing this threshold, the + * size of the checkpoint metadata increases. The checkpoint metadata of all retained completed + * checkpoints needs to fit into the JobManager's heap memory. This is typically not a problem, + * unless the threshold {@link #getMinFileSizeThreshold()} is increased significantly. + * + * Persistence Guarantees * - * The state backend has one core directory into which it puts all checkpoint data. Inside that - * directory, it creates a directory per job, inside which each checkpoint gets a directory, with - * files for each state, for example: + * Checkpoints from this state backend are as persistent and available as filesystem that is written to. + * If the file system is a persistent distributed file system, this state backend supports + * highly available setups. The backend additionally supports savepoints and externalized checkpoints. * - * {@code hdfs://namenode:port/flink-checkpoints//chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 } + * Configuration + * + * As for all state backends, this backend can either be configured within the application (by creating + * the backend with the respective constructor parameters and setting it on the execution environment) + * or by specifying it in the Flink configuration. + * + * If the state backend was specified in the application, it may pick up additional configuration + * parameters from the Flink configuration. For example, if the backend if configured in the application + * without a default savepoint directory, it will pick up a default savepoint directory specified in the + * Flink configuration of the running job/cluster. That behavior is implemented via the + * {@link #configure(Configuration)} method. */ -public class FsStateBackend extends AbstractStateBackend { +@PublicEvolving +public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend { --- End diff -- Could be, but the method would be empty in `AbstractFileStateBackend`, because the reconfiguration is implemented via a special constructor, and constructors don't inherit like
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r147090207 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.AbstractStateBackend; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.net.URI; + +/** + * A base class for all state backends that store their metadata (and data) in files. + * Examples that inherit from this are the {@link FsStateBackend}, the + * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend MemoryStateBackend}, or the --- End diff -- The main difference between `FsStatebackend` and `MemoryStateBackend` is the following (already now as well, BTW): - `FsStateBackend` writes directly individual files for individual state chunks and always writes out metadata to a file. - `MemoryStateBackend` aggregates all state (data and metadata) in a single file. It only writes that file of a checkpoint directory is configured, or if HA is activated, otherwise it just keeps it in the JobManager's memory. Thats so that one can start playing around without any checkpoint config. ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r147089435 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; + +/** + *An interface for state backends that pick up additional parameters from a configuration. + */ +public interface ConfigurableStateBackend { + + /** +* Creates a variant of the state backend that applies additional configuration parameters. +* +* Settings that were directly done on the original state backend object in the application +* program typically have precedence over setting picked up from the configuration. +* +* If no configuration is applied, or if the method directly applies configuration values to +* the (mutable) state backend object, this method may return the original state backend object. +* Otherwise it typically returns a modified copy. +* +* @param config The configuration to pick the values from. +* @return A copy of th --- End diff -- Will fix that... ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r147089325 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java --- @@ -229,29 +229,31 @@ public static ExecutionGraph buildGraph( metrics); // The default directory for externalized checkpoints - String externalizedCheckpointsDir = jobManagerConfig.getString(CoreOptions.CHECKPOINTS_DIRECTORY); + String externalizedCheckpointsDir = jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY); - // load the state backend for checkpoint metadata. - // if specified in the application, use from there, otherwise load from configuration - final StateBackend metadataBackend; + // load the state backend from the application settings + final StateBackend applicationConfiguredBackend; + final SerializedValue serializedAppConfigured = snapshotSettings.getDefaultStateBackend(); - final SerializedValue applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend(); - if (applicationConfiguredBackend != null) { + if (serializedAppConfigured == null) { + applicationConfiguredBackend = null; + } + else { try { - metadataBackend = applicationConfiguredBackend.deserializeValue(classLoader); + applicationConfiguredBackend = serializedAppConfigured.deserializeValue(classLoader); } catch (IOException | ClassNotFoundException e) { - throw new JobExecutionException(jobId, "Could not instantiate configured state backend.", e); + throw new JobExecutionException(jobId, + "Could not deserialize application-defined state backend.", e); } + } - log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.", - metadataBackend); - } else { - try { - metadataBackend = AbstractStateBackend - .loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log); - } catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) { - throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e); - } + final StateBackend rootBackend; + try { + rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault( --- End diff -- True, commits are not self contained. I tries as much, but sich it comes all from a single original commit, it would have been crazy time intensive to make every commit self contained. ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r147089150 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -116,6 +117,10 @@ * accessing this don't block the job manager actor and run asynchronously. */ private final CompletedCheckpointStore completedCheckpointStore; + /** The root checkpoint state backend, which is responsible for initializing the +* checkpoint, storing the metadata, and cleaning up the checkpoint */ + private final StateBackend checkpointStateBackend; --- End diff -- No, not really. Wanted to pick a name here that allows us later to possibly have different "backends" for the data structure, meaning heap and RocksDB in different operators. One state backend needs to be the root that defines how metadata is checkpointed and recovered. ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r147088900 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A collection of all configuration options that relate to checkpoints + * and savepoints. + */ +public class CheckpointingOptions { + + // + // general checkpoint and state backend options + // + + public static final ConfigOption STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") + .defaultValue(1); + + // + // Options specific to the file-system-based state backends + // + + /** The default directory for savepoints. Used by the state backends that write +* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption SAVEPOINT_DIRECTORY = ConfigOptions + .key("state.savepoints.dir") + .noDefaultValue() + .withDeprecatedKeys("savepoints.state.backend.fs.dir"); + + /** The default directory used for checkpoints. Used by the state backends that write +* checkpoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption CHECKPOINTS_DIRECTORY = ConfigOptions + .key("state.checkpoints.dir") + .noDefaultValue(); + + /** Option whether the heap-based key/value data structures should use an asynchronous +* snapshot method. Used by MemoryStateBackend and FsStateBackend. */ + public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = ConfigOptions + .key("state.backend.heap.async") --- End diff -- +1 to fix the names, separate effort though ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r147088825 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A collection of all configuration options that relate to checkpoints + * and savepoints. + */ +public class CheckpointingOptions { + + // + // general checkpoint and state backend options + // + + public static final ConfigOption STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") + .defaultValue(1); + + // + // Options specific to the file-system-based state backends + // + + /** The default directory for savepoints. Used by the state backends that write +* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ --- End diff -- Yes, the `MemoryStateBackend` actually works for HA and Savepoints. The name is misleading... ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r147031787 --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java --- @@ -243,11 +245,19 @@ else if (directory.exists()) { * @throws IOException if the delete operation fails */ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws IOException { - FileStatus[] fileStatuses = null; + final FileStatus[] fileStatuses; try { fileStatuses = fileSystem.listStatus(path); - } catch (Exception ignored) {} + } + catch (FileNotFoundException e) { + // path already deleted + return true; + } + catch (Exception e) { + // could not access directory, cannot delete --- End diff -- should we log a warning here? ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146993120 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java --- @@ -48,75 +53,249 @@ private final ClassLoader cl = getClass().getClassLoader(); - private final String backendKey = CoreOptions.STATE_BACKEND.key(); + private final String backendKey = CheckpointingOptions.STATE_BACKEND.key(); // + // defaults + // @Test public void testNoStateBackendDefined() throws Exception { - assertNull(AbstractStateBackend.loadStateBackendFromConfig(new Configuration(), cl, null)); + assertNull(StateBackendLoader.loadStateBackendFromConfig(new Configuration(), cl, null)); } @Test public void testInstantiateMemoryBackendByDefault() throws Exception { - StateBackend backend = AbstractStateBackend - .loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null); + StateBackend backend = + StateBackendLoader.fromApplicationOrConfigOrDefault(null, new Configuration(), cl, null); assertTrue(backend instanceof MemoryStateBackend); } @Test - public void testLoadMemoryStateBackend() throws Exception { - // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) - // to guard against config-breaking changes of the name + public void testApplicationDefinedHasPrecedence() throws Exception { + final StateBackend appBackend = Mockito.mock(StateBackend.class); + final Configuration config = new Configuration(); config.setString(backendKey, "jobmanager"); - StateBackend backend = AbstractStateBackend - .loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null); + StateBackend backend = StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config, cl, null); + assertEquals(appBackend, backend); + } - assertTrue(backend instanceof MemoryStateBackend); + // + // Memory State Backend + // + + /** +* Validates loading a memory state backend from the cluster configuration. +*/ + @Test + public void testLoadMemoryStateBackendNoParameters() throws Exception { + // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) + // to guard against config-breaking changes of the name + + final Configuration config1 = new Configuration(); + config1.setString(backendKey, "jobmanager"); + + final Configuration config2 = new Configuration(); + config2.setString(backendKey, MemoryStateBackendFactory.class.getName()); + + StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null); + StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null); + + assertTrue(backend1 instanceof MemoryStateBackend); + assertTrue(backend2 instanceof MemoryStateBackend); + } + + /** +* Validates loading a memory state backend with additional parameters from the cluster configuration. +*/ + @Test + public void testLoadMemoryStateWithParameters() throws Exception { + final String checkpointDir = new Path(tmp.newFolder().toURI()).toString(); + final String savepointDir = new Path(tmp.newFolder().toURI()).toString(); + final Path expectedCheckpointPath = new Path(checkpointDir); + final Path expectedSavepointPath = new Path(savepointDir); + + // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) + // to guard against config-breaking changes of the name + + final Configuration config1 = new Configuration(); + config1.setString(backendKey, "jobmanager"); + config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); + config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir); + + final Configuration config2 = new Configuration(); + config2.setString(backendKey,
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146990927 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.AbstractStateBackend; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.net.URI; + +/** + * A base class for all state backends that store their metadata (and data) in files. + * Examples that inherit from this are the {@link FsStateBackend}, the + * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend MemoryStateBackend}, or the --- End diff -- Nit: is this true for `MemoryStateBackend`? At the very least it's weird. ð ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146991686 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java --- @@ -18,54 +18,92 @@ package org.apache.flink.runtime.state.filesystem; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import java.io.IOException; import java.net.URI; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The file state backend is a state backend that stores the state of streaming jobs in a file system. + * This state backend holds the working state in the memory (JVM heap) of the TaskManagers. + * The state backend checkpoints state as files to a file system (hence the backend's name). + * + * Each checkpoint individually will store all its files in a subdirectory that includes the + * checkpoint number, such as {@code hdfs://namenode:port/flink-checkpoints/chk-17/}. + * + * State Size Considerations + * + * Working state is kept on the TaskManager heap. If a TaskManager executes multiple + * tasks concurrently (if the TaskManager has multiple slots, or if slot-sharing is used) + * then the aggregate state of all tasks needs to fit into that TaskManager's memory. + * + * This state backend stores small state chunks directly with the metadata, to avoid creating + * many small files. The threshold for that is configurable. When increasing this threshold, the + * size of the checkpoint metadata increases. The checkpoint metadata of all retained completed + * checkpoints needs to fit into the JobManager's heap memory. This is typically not a problem, + * unless the threshold {@link #getMinFileSizeThreshold()} is increased significantly. + * + * Persistence Guarantees * - * The state backend has one core directory into which it puts all checkpoint data. Inside that - * directory, it creates a directory per job, inside which each checkpoint gets a directory, with - * files for each state, for example: + * Checkpoints from this state backend are as persistent and available as filesystem that is written to. + * If the file system is a persistent distributed file system, this state backend supports + * highly available setups. The backend additionally supports savepoints and externalized checkpoints. * - * {@code hdfs://namenode:port/flink-checkpoints//chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 } + * Configuration + * + * As for all state backends, this backend can either be configured within the application (by creating + * the backend with the respective constructor parameters and setting it on the execution environment) + * or by specifying it in the Flink configuration. + * + * If the state backend was specified in the application, it may pick up additional configuration + * parameters from the Flink configuration. For example, if the backend if configured in the application + * without a default savepoint directory, it will pick up a default savepoint directory specified in the + * Flink configuration of the running job/cluster. That behavior is implemented via the + * {@link #configure(Configuration)} method. */ -public class FsStateBackend extends AbstractStateBackend { +@PublicEvolving +public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend { --- End diff -- Why isn't `AbstractFileStateBackend` configurable? ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146992430 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java --- @@ -18,45 +18,41 @@ package org.apache.flink.runtime.state.filesystem; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateBackendFactory; -import java.io.IOException; +import java.net.URI; /** - * A factory that creates an {@link org.apache.flink.runtime.state.filesystem.FsStateBackend} - * from a configuration. + * A factory that creates an {@link FsStateBackend} from a configuration. */ +@PublicEvolving public class FsStateBackendFactory implements StateBackendFactory { - - /** The key under which the config stores the directory where checkpoints should be stored */ - public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir"; - - /** The key under which the config stores the threshold for state to be store in memory, -* rather than in files */ - public static final String MEMORY_THRESHOLD_CONF_KEY = "state.backend.fs.memory-threshold"; - @Override public FsStateBackend createFromConfig(Configuration config) throws IllegalConfigurationException { --- End diff -- Wouldn't it make sense to do all of this in `configure()`? The factories are essentially useless now and would only instantiate the backend and then call `configure()` with the config. ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146985691 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A collection of all configuration options that relate to checkpoints + * and savepoints. + */ +public class CheckpointingOptions { + + // + // general checkpoint and state backend options + // + + public static final ConfigOption STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") + .defaultValue(1); + + // + // Options specific to the file-system-based state backends + // + + /** The default directory for savepoints. Used by the state backends that write +* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption SAVEPOINT_DIRECTORY = ConfigOptions + .key("state.savepoints.dir") + .noDefaultValue() + .withDeprecatedKeys("savepoints.state.backend.fs.dir"); + + /** The default directory used for checkpoints. Used by the state backends that write +* checkpoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption CHECKPOINTS_DIRECTORY = ConfigOptions + .key("state.checkpoints.dir") + .noDefaultValue(); + + /** Option whether the heap-based key/value data structures should use an asynchronous +* snapshot method. Used by MemoryStateBackend and FsStateBackend. */ + public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = ConfigOptions + .key("state.backend.heap.async") --- End diff -- Off-topic: The fact that we have "FsStateBackend" and "MemoryStateBackend" is confusing every user. We should only have "HeapStateBackend" and "RocksDBStateBackend", which both checkpoint to a DFS. (And the current "MemoryStateBackend" behaviour could be a switch on "HeapStateBackend"). But I'm afraid it's too late for that. ð© ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146987131 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java --- @@ -229,29 +229,31 @@ public static ExecutionGraph buildGraph( metrics); // The default directory for externalized checkpoints - String externalizedCheckpointsDir = jobManagerConfig.getString(CoreOptions.CHECKPOINTS_DIRECTORY); + String externalizedCheckpointsDir = jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY); - // load the state backend for checkpoint metadata. - // if specified in the application, use from there, otherwise load from configuration - final StateBackend metadataBackend; + // load the state backend from the application settings + final StateBackend applicationConfiguredBackend; + final SerializedValue serializedAppConfigured = snapshotSettings.getDefaultStateBackend(); - final SerializedValue applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend(); - if (applicationConfiguredBackend != null) { + if (serializedAppConfigured == null) { + applicationConfiguredBackend = null; + } + else { try { - metadataBackend = applicationConfiguredBackend.deserializeValue(classLoader); + applicationConfiguredBackend = serializedAppConfigured.deserializeValue(classLoader); } catch (IOException | ClassNotFoundException e) { - throw new JobExecutionException(jobId, "Could not instantiate configured state backend.", e); + throw new JobExecutionException(jobId, + "Could not deserialize application-defined state backend.", e); } + } - log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.", - metadataBackend); - } else { - try { - metadataBackend = AbstractStateBackend - .loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log); - } catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) { - throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e); - } + final StateBackend rootBackend; + try { + rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault( --- End diff -- nit: This is only defined in the later commit "[FLINK-5823] [checkpoints] State backends define checkpoint and savepoint directories, improved configuration", so technically the commit that adds this code is broken. ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146984727 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A collection of all configuration options that relate to checkpoints + * and savepoints. + */ +public class CheckpointingOptions { + + // + // general checkpoint and state backend options + // + + public static final ConfigOption STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") + .defaultValue(1); + + // + // Options specific to the file-system-based state backends + // + + /** The default directory for savepoints. Used by the state backends that write +* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ --- End diff -- nit: is this true for `MemoryStateBackend`? Same for the options below. ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146987476 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -116,6 +117,10 @@ * accessing this don't block the job manager actor and run asynchronously. */ private final CompletedCheckpointStore completedCheckpointStore; + /** The root checkpoint state backend, which is responsible for initializing the +* checkpoint, storing the metadata, and cleaning up the checkpoint */ + private final StateBackend checkpointStateBackend; --- End diff -- nit: Are there other backends than "checkpoint" backend? ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146988381 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; + +/** + *An interface for state backends that pick up additional parameters from a configuration. + */ +public interface ConfigurableStateBackend { + + /** +* Creates a variant of the state backend that applies additional configuration parameters. +* +* Settings that were directly done on the original state backend object in the application +* program typically have precedence over setting picked up from the configuration. +* +* If no configuration is applied, or if the method directly applies configuration values to +* the (mutable) state backend object, this method may return the original state backend object. +* Otherwise it typically returns a modified copy. +* +* @param config The configuration to pick the values from. +* @return A copy of th --- End diff -- Sentence is trailing off. ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146985767 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A collection of all configuration options that relate to checkpoints + * and savepoints. + */ +public class CheckpointingOptions { + + // + // general checkpoint and state backend options + // + + public static final ConfigOption STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") + .defaultValue(1); + + // + // Options specific to the file-system-based state backends + // + + /** The default directory for savepoints. Used by the state backends that write +* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption SAVEPOINT_DIRECTORY = ConfigOptions + .key("state.savepoints.dir") + .noDefaultValue() + .withDeprecatedKeys("savepoints.state.backend.fs.dir"); + + /** The default directory used for checkpoints. Used by the state backends that write +* checkpoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption CHECKPOINTS_DIRECTORY = ConfigOptions + .key("state.checkpoints.dir") + .noDefaultValue(); + + /** Option whether the heap-based key/value data structures should use an asynchronous +* snapshot method. Used by MemoryStateBackend and FsStateBackend. */ + public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = ConfigOptions + .key("state.backend.heap.async") --- End diff -- Mentioning it because we have no "heap" backend, technically. ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/4907 [FLINK-5823] [checkpoints] State Backends also handle Checkpoint Metadata (part 1) This is an incremental (first part) rebuild of #3522 on the latest master. For ease of review, broken down into small chunks. ## Part 1: Application-defined State Backends pick up additional values from the configuration We need to keep supporting the scenario of setting a state backends in the user program, but configuring parameters like checkpoint directory in the cluster config. To support that, state backends may implement an additional interface which lets them pick up configuration values from the cluster configuration. This also makes testing the checkpoint / savepoint configuration much easier. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink backend Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4907.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4907 commit 8055cc9d84e206abd00cbc339c6cc390c53f2afe Author: Stephan EwenDate: 2017-10-24T17:46:07Z [hotfix] [hdfs] Avoid reparsing URIs in Hadoop File Status conversion commit 96068b93d4c332a9b8cfba06ecc9d42804a79f92 Author: Stephan Ewen Date: 2017-10-24T19:20:44Z [hotfix] [streaming] Move SerializedCheckpointData to proper scope for MessageAcknowledgingSourceBase commit e098cc6ac8d77a027aefb6c1bcee41f6214ed74c Author: Stephan Ewen Date: 2017-10-25T09:56:59Z [hotfix] [runtime] Minor optimization in CheckpointMetrics commit 8a6b78a8dacadb5476c6bf452fbdab352d0ab908 Author: Stephan Ewen Date: 2017-10-25T11:30:51Z [hotfix] [checkpoints] fix warnings and make minor improvements to CheckpointCoordinatorTest and SharedStateRegistry commit aa09d82fd57d2636f54ec2a56e457f19d8ec9490 Author: Stephan Ewen Date: 2017-10-25T11:46:34Z [hotfix] [core] Fix FileUtils.deletePathIfEmpty commit 15be2602d4d44e29e756cd558130f2303d82fe8b Author: Stephan Ewen Date: 2017-10-25T17:04:58Z [hotfix] [checkpoints] Remove incorrect 'Serializable' from StateBackendFactory commit 1438df297bd820c07787563ddcfd2fda8773387f Author: Stephan Ewen Date: 2017-10-25T12:16:37Z [FLINK-7924] [checkpoints] Fix incorrect names of checkpoint options Checkpoint options are incorrectly always called 'FULL_CHECKPOINT' when actually, the checkpoints may always be incremental and only savepoints have to be full and self contained. Initially, we planned to add options for multiple checkpoints, like checkpoints that were foreced to be full, and checkpoints that were incremental. That is not necessary at this point. commit 6b792bf72fdb0c17625b1fe97c52791d550a74e4 Author: Stephan Ewen Date: 2017-10-25T15:30:14Z [FLINK-7925] [checkpoints] Add CheckpointingOptions The CheckpointingOptions consolidate all checkpointing and state backend-related settings that were previously split across different classes. commit c4ce5522d8052ae8af8134bac4ea74bb5a929027 Author: Stephan Ewen Date: 2017-10-25T11:23:46Z [FLINK-5823] [checkpoints] Pass state backend to checkpoint coordinator commit 337e354bbe198ce6ab68d23cec93bc0c81bbbfaf Author: Stephan Ewen Date: 2017-10-25T15:32:17Z [hotfix] [core] Fix broken JavaDoc links in ConfigConstants commit 0522ac34146aecfad2dbd9c806e6f7be182d00fd Author: Stephan Ewen Date: 2017-10-25T17:04:10Z [FLINK-5823] [checkpoints] State backends define checkpoint and savepoint directories, improved configuration ---