[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/5248 ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160761745 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class contains utility methods to load state backends from configurations. + */ +public class StateBackendLoader { + + // + // Configuration shortcut names + // + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // + // Loading the state backend from a configuration + // + + /** +* Loads the state backend from the configuration, from the parameter 'state.backend', as defined +* in {@link CheckpointingOptions#STATE_BACKEND}. +* +* The state backends can be specified either via their shortcut name, or via the class name +* of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory +* is instantiated (via its zero-argument constructor) and its +* {@link StateBackendFactory#createFromConfig(Configuration)} method is called. +* +* Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}', +* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and +* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'. +* +* @param config The configuration to load the state backend from +* @param classLoader The class loader that should be used to load the state backend +* @param logger Optionally, a logger to log actions to (may be null) +* +* @return The instantiated state backend. +* +* @throws DynamicCodeLoadingException +* Thrown if a state backend factory is configured and the factory class was not +* found or the factory could not be instantiated +* @throws IllegalConfigurationException +* May be thrown by the StateBackendFactory when creating / configuring the state +* backend in the factory +* @throws IOException +* May be thrown by the StateBackendFactory when instantiating the state backend +*/ + pub
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160761001 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointStorageLocation; + +import javax.annotation.Nullable; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * An implementation of durable checkpoint storage to file systems. + */ +public class FsCheckpointStorage extends AbstractFsCheckpointStorage { + + private final FileSystem fileSystem; + + private final Path checkpointsDirectory; + + private final Path sharedStateDirectory; + + private final Path taskOwnedStateDirectory; --- End diff -- We may be able to remove this in the future, but because we need to pull task owned state out of the "exclusive state" directory soon, and the rework to make this use shared state handles is probably more delicate, I would actually have this for now. Should be trivial rework to remove it once we don't need it any more. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160761075 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java --- @@ -18,92 +18,272 @@ package org.apache.flink.runtime.state.memory; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import javax.annotation.Nullable; + import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkArgument; + /** - * A {@link AbstractStateBackend} that stores all its data and checkpoints in memory and has no - * capabilities to spill to disk. Checkpoints are serialized and the serialized data is - * transferred + * This state backend holds the working state in the memory (JVM heap) of the TaskManagers. + * The state backend checkpoints state directly to the JobManager's memory (hence the backend's name), + * but the checkpoints will be persisted to a file system for high-availability setups and savepoints. + * The MemoryStateBackend is consequently a FileSystem-based backend that can work without a + * file system dependency in simple setups. + * + * This state backend should be used only for experimentation, quick local setups, + * or for streaming applications that have very small state: Because it requires checkpoints to + * go through the JobManager's memory, larger state will occupy larger portions of the JobManager's + * main memory, reducing operational stability. + * For any other setup, the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend} + * should be used. The {@code FsStateBackend} holds the working state on the TaskManagers in the same way, but + * checkpoints state directly to files rather then to the JobManager's memory, thus supporting + * large state sizes. + * + * State Size Considerations + * + * State checkpointing with this state backend is subject to the following conditions: + * + * Each individual state must not exceed the configured maximum state size + * (see {@link #getMaxStateSize()}. + * + * All state from one task (i.e., the sum of all operator states and keyed states from all + * chained operators of the task) must not exceed what the RPC system supports, which is + * be default < 10 MB. That limit can be configured up, but that is typically not advised. + * + * The sum of all states in the application times all retained checkpoints must comfortably + * fit into the JobManager's JVM heap space. + * + * + * Persistence Guarantees + * + * For the use cases where the state sizes can be handled by this backend, the backend does guarantee + * persistence for savepoints, externalized checkpoints (of configured), and checkpoints + * (when high-availability is configured). + * + * Configuration + * + * As for all state backends, this backend can either be configured within the application (by creating + * the backend with the respective constructor parameters and setting it on the execution environment) + * or by specifying it in the Flink configuration. + * + * If the state backend was specified in the application, it may pick up additional configuration + * parameters from the Flink configuration. For example, if the backend if configured in the application + * without a default savepoint directory, it will pick up a default savepoint directory specified in the + * Flink configuration of the running job/cluster. That behavior is implemented via the + * {@link #configure(Configuration)} method. */ -public class MemoryStateBackend extends AbstractStateBackend {
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160760708 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.net.URI; + +/** + * A base class for all state backends that store their metadata (and data) in files. + * Examples that inherit from this are the {@link FsStateBackend}, the + * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend MemoryStateBackend}, or the + * {@code RocksDBStateBackend}. + * + * This class takes the base checkpoint- and savepoint directory paths, but also accepts null + * for both of then, in which case creating externalized checkpoint is not possible, and it is not + * possible to create a savepoint with a default path. Null is accepted to enable implementations + * that only optionally support default savepoints and externalized checkpoints. + * + * Checkpoint Layout + * + * The state backend is configured with a base directory and persists the checkpoint data of specific + * checkpoints in specific subdirectories. For example, if the base directory was set to + * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will create a subdirectory with + * the job's ID that will contain the actual checkpoints: + * ({@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b}) + * + * Each checkpoint individually will store all its files in a subdirectory that includes the + * checkpoint number, such as {@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}. + * + * Savepoint Layout + * + * A savepoint that is set to be stored in path {@code hdfs://namenode:port/flink-savepoints/}, will create + * a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it stores all savepoint data. + * The random digits are added as "entropy" to avoid directory collisions. + * + * Metadata and Success Files + * + * A completed checkpoint writes its metadata into a file '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'. + * After that is complete (i.e., the file complete), it writes an additional file + * '{@value AbstractFsCheckpointStorage#SUCCESS_FILE_NAME}'. + * + * Ideally that would not be necessary, and one would write the metadata file to a temporary file and + * then issue a atomic (or at least constant time) rename. But some of the file systems (like S3) do + * not support that: A rename is a copy process which, when failing, leaves corrupt half written + * files/objects. The success file is hence needed as a signal that the + * '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'file is complete. + */ +@PublicEvolving +public abstract class AbstractFileStateBackend extends AbstractStateBackend { + + private static final long serialVersionUID = 1L; + + // + // State Backend Properties + // + + /** The path where checkpoints will be stored, or null, if none has been configured. */ + @Nullable + private final Path baseCheckpoin
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160759604 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class contains utility methods to load state backends from configurations. + */ +public class StateBackendLoader { + + // + // Configuration shortcut names + // + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // + // Loading the state backend from a configuration + // + + /** +* Loads the state backend from the configuration, from the parameter 'state.backend', as defined +* in {@link CheckpointingOptions#STATE_BACKEND}. +* +* The state backends can be specified either via their shortcut name, or via the class name +* of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory +* is instantiated (via its zero-argument constructor) and its +* {@link StateBackendFactory#createFromConfig(Configuration)} method is called. +* +* Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}', +* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and +* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'. +* +* @param config The configuration to load the state backend from +* @param classLoader The class loader that should be used to load the state backend +* @param logger Optionally, a logger to log actions to (may be null) +* +* @return The instantiated state backend. +* +* @throws DynamicCodeLoadingException +* Thrown if a state backend factory is configured and the factory class was not +* found or the factory could not be instantiated +* @throws IllegalConfigurationException +* May be thrown by the StateBackendFactory when creating / configuring the state +* backend in the factory +* @throws IOException +* May be thrown by the StateBackendFactory when instantiating the state backend +*/ + pub
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160758820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -84,13 +84,44 @@ public interface StateBackend extends java.io.Serializable { // - // Persistent Bytes Storage + // Checkpoint storage - the durable persistence of checkpoint data + // + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; --- End diff -- See above, I don't think we get around String. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160758717 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -84,13 +84,44 @@ public interface StateBackend extends java.io.Serializable { // - // Persistent Bytes Storage + // Checkpoint storage - the durable persistence of checkpoint data --- End diff -- Agreed. In the end we have: - `CheckpointStorage`: Persisting bytes and Metadata. The Stream factories should go there in the next part of the rework. - `KeyedStateBackend`: Holding / snapshotting keyed state - `OperatorStateBackend`: Holding / snapshotting operator state These should be three completely independent interfaces, and this PR moves towards that by introducing the `CheckpointStorage`, even though it does not yet move the Stream Factories there. Once the rework is done, the `StateBackend` just bundles a combination is the three above components (should have only three methods, maybe convenience overload). ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160757943 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; + +import java.io.IOException; + +/** + * A storage location for one particular checkpoint. This location is typically + * created and initialized via {@link CheckpointStorage#initializeCheckpoint(long)} or + * {@link CheckpointStorage#initializeSavepoint(long, String)}. + */ +public interface CheckpointStorageLocation { + + /** +* Creates the output stream to persist the checkpoint metadata to. +* +* @return The output stream to persist the checkpoint metadata to. +* @throws IOException Thrown, if the stream cannot be opened due to an I/O error. +*/ + CheckpointStateOutputStream createMetadataOutputStream() throws IOException; + + /** +* Finalizes the checkpoint, marking the location as a finished checkpoint. +* This method returns the external checkpoint pointer that can be used to resolve +* the checkpoint upon recovery. +* +* @return The external pointer to the checkpoint at this location. +* @throws IOException Thrown, if finalizing / marking as finished fails due to an I/O error. +*/ + String markCheckpointAsFinished() throws IOException; --- End diff -- I have gone back and forth on this. I felt like keeping it, because we do need to get to some form of external pointer (a String) in the end (for ZK and for external resume). This couples the finalization and obtaining that pointer, which makes sense to me (the pointer may not be producible before finalization). Pushing that into the Metadata's OutputStream needs another interface that creates a "handle and pointer" on closing. I had that in a previous version, it felt much more clumsy. This variant seems nicer to me. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160756970 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); + + /** +* Checks whether the storage has a default savepoint location configured. +*/ + boolean hasDefaultSavepointLocation(); + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; + + /** +* Initializes a storage location for new checkpoint with the given ID. +* +* The returned storage location can be used to write the checkpoint data and metadata +* to and to obtain the pointers for the location(s) where the actual checkpoint data should be +* stored. +* +* @param checkpointId The ID (logical timestamp) of the checkpoint that should be persisted. +* @return A storage location for the data and metadata of the given checkpoint. +* +* @throws IOException Thrown if the storage location cannot be initialized due to an I/O exception. +*/ + CheckpointStorageLocation initializeCheckpoint(long checkpointId) throws IOException; --- End diff -- How about `initializeLocationForCheckpoint`? ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160756860 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); + + /** +* Checks whether the storage has a default savepoint location configured. +*/ + boolean hasDefaultSavepointLocation(); + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; --- End diff -- I was wondering about that as well. My thinking was that in the end, what we get is just Strings (from the command line, from ZooKeeper, from the REST interface) and there is no point before that method that could parse/convert the string into something state backend specific. So we have to deal with Strings anyways. Wrapping them in another class felt not too useful to me here. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160756347 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); --- End diff -- Yes, this is part of future work. The high-level description says that: > To make sure there is no extra persisting of the checkpoint metadata by the HA store (it simply references the regular persisted checkpoint metadata) we need some changes to the ZooKeeperCompletedCheckpointStore. The method helps as a sanity check, to allow the checkpoint coordinator to validate that if the `CompletedCheckpointStore` requires durable persistence, then the `CheckpointStore` must provide that. It is used to catch a weird corner case where the ZkCompletedCheckpointStore is used, but the StateStore (from the MemoryStateBackend) does not support actually persisting any data durably. We could remove that, because currently this can never happen (The MemoryStateBackend automatically fills in durable persistence in HA setups) ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160755282 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { --- End diff -- The storage will create the stream factories later, that is the plan. Simply did not want to push that in addition into the pull request. This of this pull request as the "JobManager" side of things, the "TaskManager" side of thing is to come next. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160754932 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A utility class with the methods to write/load/dispose the checkpoint and savepoint metadata. + * + * Stored checkpoint metadata files have the following format: + * [MagicNumber (int) | Format Version (int) | Checkpoint Metadata (variable)] + * + * The actual savepoint serialization is version-specific via the {@link SavepointSerializer}. + */ +public class Checkpoints { --- End diff -- This class mimics somewhat the way this was done previously, scattered over the `SavepointLoader` and `SavepointStore` class. I changed it to `Checkpoints` because the code was relevant not only to savepoints, but to checkpoints in general. It was mainly a bit of name-fixing work as part of the adjustment . Making this pluggable is another orthogonal effort in my opinion, and also a probably more involved one. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160752496 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java --- @@ -251,77 +225,66 @@ public String toString() { false, false); - private static final CheckpointProperties STANDARD_CHECKPOINT = new CheckpointProperties( - false, + private static final CheckpointProperties CHECKPOINT_NEVER_RETAINED = new CheckpointProperties( false, false, true, - true, - true, - true, - true); + true, // Delete on success + true, // Delete on cancellation + true, // Delete on failure + true); // Delete on suspension - private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_RETAINED = new CheckpointProperties( + private static final CheckpointProperties CHECKPOINT_RETAINED_ON_FAILURE = new CheckpointProperties( false, - true, false, true, - true, - false, // Retain on cancellation - false, - false); // Retain on suspension + true, // Delete on success + true, // Delete on cancellation + false, // Retain on failure + true); // Delete on suspension - private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_DELETED = new CheckpointProperties( + private static final CheckpointProperties CHECKPOINT_RETAINED_ON_CANCELLATION = new CheckpointProperties( false, - true, false, true, - true, - true, // Delete on cancellation - false, - true); // Delete on suspension + true, // Delete on success + false, // Retain on cancellation + false, // Retain on failure + false); // Retain on suspension + /** * Creates the checkpoint properties for a (manually triggered) savepoint. * -* Savepoints are forced and persisted externally. They have to be +* Savepoints are not queued due to time trigger limits. They have to be * garbage collected manually. * * @return Checkpoint properties for a (manually triggered) savepoint. */ - public static CheckpointProperties forStandardSavepoint() { - return STANDARD_SAVEPOINT; - } - - /** -* Creates the checkpoint properties for a regular checkpoint. -* -* Regular checkpoints are not forced and not persisted externally. They -* are garbage collected automatically. -* -* @return Checkpoint properties for a regular checkpoint. -*/ - public static CheckpointProperties forStandardCheckpoint() { - return STANDARD_CHECKPOINT; + public static CheckpointProperties forSavepoint() { + return SAVEPOINT; } /** -* Creates the checkpoint properties for an external checkpoint. +* Creates the checkpoint properties for a checkpoint. * -* External checkpoints are not forced, but persisted externally. They -* are garbage collected automatically, except when the owning job +* Checkpoints may be queued in case too many other checkpoints are currently happening. +* They are garbage collected automatically, except when the owning job * terminates in state {@link JobStatus#FAILED}. The user is required to * configure the clean up behaviour on job cancellation. * -* @param deleteOnCancellation Flag indicating whether to discard on cancellation. -* * @return Checkpoint properties for an external checkpoint. */ - public static CheckpointProperties forExternalizedCheckpoint(boolean deleteOnCancellation) { - if (deleteOnCancellation) { - return EXTERNALIZED_CHECKPOINT_DELETED; - } else { - return EXTERNALIZED_CHECKPOINT_RETAINED; + public static CheckpointProperties forCheckpoint(CheckpointRetentionPolicy policy) { --- End diff -- There is actually a `forSavepoint(...)` method, that's why I think keeping the name makes sense. We could change it to `forCheckpointWithPolic
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160752558 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -1185,6 +1153,10 @@ public int getNumberOfRetainedSuccessfulCheckpoints() { } } + public CheckpointStorage getCheckpointStorage() { --- End diff -- What names would you suggest to use? ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160751897 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A collection of all configuration options that relate to checkpoints + * and savepoints. + */ +public class CheckpointingOptions { + + // + // general checkpoint and state backend options + // + + public static final ConfigOption STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") + .defaultValue(1); + + // + // Options specific to the file-system-based state backends + // + + /** The default directory for savepoints. Used by the state backends that write +* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption SAVEPOINT_DIRECTORY = ConfigOptions + .key("state.savepoints.dir") + .noDefaultValue() + .withDeprecatedKeys("savepoints.state.backend.fs.dir"); + + /** The default directory used for checkpoints. Used by the state backends that write +* checkpoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption CHECKPOINTS_DIRECTORY = ConfigOptions + .key("state.checkpoints.dir") + .noDefaultValue(); + + /** Option whether the heap-based key/value data structures should use an asynchronous +* snapshot method. Used by MemoryStateBackend and FsStateBackend. */ + public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = ConfigOptions + .key("state.backend.heap.async") --- End diff -- Makes sense, let's scope them more generally. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160718388 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); + + /** +* Checks whether the storage has a default savepoint location configured. +*/ + boolean hasDefaultSavepointLocation(); + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; --- End diff -- Furthermore, I suggest to introduce a separate class like `(Checkpoint)StoragePointer` instead of plain `String`. Internally, the class can be really just a string, but I find it very helpful for reading to code if there is a meaningful classname attached to a concept instead of raw String usage. Also helps to avoid autocomplete mistakes for methods that take a pointer plus other strings. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160686745 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class contains utility methods to load state backends from configurations. + */ +public class StateBackendLoader { + + // + // Configuration shortcut names + // + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // + // Loading the state backend from a configuration + // + + /** +* Loads the state backend from the configuration, from the parameter 'state.backend', as defined +* in {@link CheckpointingOptions#STATE_BACKEND}. +* +* The state backends can be specified either via their shortcut name, or via the class name +* of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory +* is instantiated (via its zero-argument constructor) and its +* {@link StateBackendFactory#createFromConfig(Configuration)} method is called. +* +* Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}', +* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and +* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'. +* +* @param config The configuration to load the state backend from +* @param classLoader The class loader that should be used to load the state backend +* @param logger Optionally, a logger to log actions to (may be null) +* +* @return The instantiated state backend. +* +* @throws DynamicCodeLoadingException +* Thrown if a state backend factory is configured and the factory class was not +* found or the factory could not be instantiated +* @throws IllegalConfigurationException +* May be thrown by the StateBackendFactory when creating / configuring the state +* backend in the factory +* @throws IOException +* May be thrown by the StateBackendFactory when instantiating the state backend +*/ +
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160690634 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class contains utility methods to load state backends from configurations. + */ +public class StateBackendLoader { + + // + // Configuration shortcut names + // + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // + // Loading the state backend from a configuration + // + + /** +* Loads the state backend from the configuration, from the parameter 'state.backend', as defined +* in {@link CheckpointingOptions#STATE_BACKEND}. +* +* The state backends can be specified either via their shortcut name, or via the class name +* of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory +* is instantiated (via its zero-argument constructor) and its +* {@link StateBackendFactory#createFromConfig(Configuration)} method is called. +* +* Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}', +* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and +* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'. +* +* @param config The configuration to load the state backend from +* @param classLoader The class loader that should be used to load the state backend +* @param logger Optionally, a logger to log actions to (may be null) +* +* @return The instantiated state backend. +* +* @throws DynamicCodeLoadingException +* Thrown if a state backend factory is configured and the factory class was not +* found or the factory could not be instantiated +* @throws IllegalConfigurationException +* May be thrown by the StateBackendFactory when creating / configuring the state +* backend in the factory +* @throws IOException +* May be thrown by the StateBackendFactory when instantiating the state backend +*/ +
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160718229 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); + + /** +* Checks whether the storage has a default savepoint location configured. +*/ + boolean hasDefaultSavepointLocation(); + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; --- End diff -- I think `resolveStoragePointerForCheckpoint(...)` or similar might be a better name. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160687956 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class contains utility methods to load state backends from configurations. + */ +public class StateBackendLoader { + + // + // Configuration shortcut names + // + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // + // Loading the state backend from a configuration + // + + /** +* Loads the state backend from the configuration, from the parameter 'state.backend', as defined +* in {@link CheckpointingOptions#STATE_BACKEND}. +* +* The state backends can be specified either via their shortcut name, or via the class name +* of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory +* is instantiated (via its zero-argument constructor) and its +* {@link StateBackendFactory#createFromConfig(Configuration)} method is called. +* +* Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}', +* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and +* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'. +* +* @param config The configuration to load the state backend from +* @param classLoader The class loader that should be used to load the state backend +* @param logger Optionally, a logger to log actions to (may be null) +* +* @return The instantiated state backend. +* +* @throws DynamicCodeLoadingException +* Thrown if a state backend factory is configured and the factory class was not +* found or the factory could not be instantiated +* @throws IllegalConfigurationException +* May be thrown by the StateBackendFactory when creating / configuring the state +* backend in the factory +* @throws IOException +* May be thrown by the StateBackendFactory when instantiating the state backend +*/ +
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160698030 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A utility class with the methods to write/load/dispose the checkpoint and savepoint metadata. + * + * Stored checkpoint metadata files have the following format: + * [MagicNumber (int) | Format Version (int) | Checkpoint Metadata (variable)] + * + * The actual savepoint serialization is version-specific via the {@link SavepointSerializer}. + */ +public class Checkpoints { --- End diff -- This class contains a lot of static methods that somehow look like they want to belong to a checkpoint/savepoint store object. For the sake of keeping those part replaceable for unit tests, I wonder if we should eliminate the static nature of this code? ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160690515 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class contains utility methods to load state backends from configurations. + */ +public class StateBackendLoader { + + // + // Configuration shortcut names + // + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // + // Loading the state backend from a configuration + // + + /** +* Loads the state backend from the configuration, from the parameter 'state.backend', as defined +* in {@link CheckpointingOptions#STATE_BACKEND}. +* +* The state backends can be specified either via their shortcut name, or via the class name +* of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory +* is instantiated (via its zero-argument constructor) and its +* {@link StateBackendFactory#createFromConfig(Configuration)} method is called. +* +* Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}', +* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and +* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'. +* +* @param config The configuration to load the state backend from +* @param classLoader The class loader that should be used to load the state backend +* @param logger Optionally, a logger to log actions to (may be null) +* +* @return The instantiated state backend. +* +* @throws DynamicCodeLoadingException +* Thrown if a state backend factory is configured and the factory class was not +* found or the factory could not be instantiated +* @throws IllegalConfigurationException +* May be thrown by the StateBackendFactory when creating / configuring the state +* backend in the factory +* @throws IOException +* May be thrown by the StateBackendFactory when instantiating the state backend +*/ +
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160713950 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java --- @@ -251,77 +225,66 @@ public String toString() { false, false); - private static final CheckpointProperties STANDARD_CHECKPOINT = new CheckpointProperties( - false, + private static final CheckpointProperties CHECKPOINT_NEVER_RETAINED = new CheckpointProperties( false, false, true, - true, - true, - true, - true); + true, // Delete on success + true, // Delete on cancellation + true, // Delete on failure + true); // Delete on suspension - private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_RETAINED = new CheckpointProperties( + private static final CheckpointProperties CHECKPOINT_RETAINED_ON_FAILURE = new CheckpointProperties( false, - true, false, true, - true, - false, // Retain on cancellation - false, - false); // Retain on suspension + true, // Delete on success + true, // Delete on cancellation + false, // Retain on failure + true); // Delete on suspension - private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_DELETED = new CheckpointProperties( + private static final CheckpointProperties CHECKPOINT_RETAINED_ON_CANCELLATION = new CheckpointProperties( false, - true, false, true, - true, - true, // Delete on cancellation - false, - true); // Delete on suspension + true, // Delete on success + false, // Retain on cancellation + false, // Retain on failure + false); // Retain on suspension + /** * Creates the checkpoint properties for a (manually triggered) savepoint. * -* Savepoints are forced and persisted externally. They have to be +* Savepoints are not queued due to time trigger limits. They have to be * garbage collected manually. * * @return Checkpoint properties for a (manually triggered) savepoint. */ - public static CheckpointProperties forStandardSavepoint() { - return STANDARD_SAVEPOINT; - } - - /** -* Creates the checkpoint properties for a regular checkpoint. -* -* Regular checkpoints are not forced and not persisted externally. They -* are garbage collected automatically. -* -* @return Checkpoint properties for a regular checkpoint. -*/ - public static CheckpointProperties forStandardCheckpoint() { - return STANDARD_CHECKPOINT; + public static CheckpointProperties forSavepoint() { + return SAVEPOINT; } /** -* Creates the checkpoint properties for an external checkpoint. +* Creates the checkpoint properties for a checkpoint. * -* External checkpoints are not forced, but persisted externally. They -* are garbage collected automatically, except when the owning job +* Checkpoints may be queued in case too many other checkpoints are currently happening. +* They are garbage collected automatically, except when the owning job * terminates in state {@link JobStatus#FAILED}. The user is required to * configure the clean up behaviour on job cancellation. * -* @param deleteOnCancellation Flag indicating whether to discard on cancellation. -* * @return Checkpoint properties for an external checkpoint. */ - public static CheckpointProperties forExternalizedCheckpoint(boolean deleteOnCancellation) { - if (deleteOnCancellation) { - return EXTERNALIZED_CHECKPOINT_DELETED; - } else { - return EXTERNALIZED_CHECKPOINT_RETAINED; + public static CheckpointProperties forCheckpoint(CheckpointRetentionPolicy policy) { --- End diff -- I suggest changing the method name to `forPolicy(...)` or `forRetentionPolicy(...)`. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160630316 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A collection of all configuration options that relate to checkpoints + * and savepoints. + */ +public class CheckpointingOptions { + + // + // general checkpoint and state backend options + // + + public static final ConfigOption STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") + .defaultValue(1); + + // + // Options specific to the file-system-based state backends + // + + /** The default directory for savepoints. Used by the state backends that write +* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption SAVEPOINT_DIRECTORY = ConfigOptions + .key("state.savepoints.dir") + .noDefaultValue() + .withDeprecatedKeys("savepoints.state.backend.fs.dir"); + + /** The default directory used for checkpoints. Used by the state backends that write +* checkpoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption CHECKPOINTS_DIRECTORY = ConfigOptions + .key("state.checkpoints.dir") + .noDefaultValue(); + + /** Option whether the heap-based key/value data structures should use an asynchronous +* snapshot method. Used by MemoryStateBackend and FsStateBackend. */ + public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = ConfigOptions + .key("state.backend.heap.async") --- End diff -- I wonder if it could make sense to not scope options like async or incremental per backend, just having them as a general switch that backends may or may not support. We could log if a backend choses to ignore a configuration because does not support it. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160638882 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java --- @@ -18,92 +18,272 @@ package org.apache.flink.runtime.state.memory; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import javax.annotation.Nullable; + import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkArgument; + /** - * A {@link AbstractStateBackend} that stores all its data and checkpoints in memory and has no - * capabilities to spill to disk. Checkpoints are serialized and the serialized data is - * transferred + * This state backend holds the working state in the memory (JVM heap) of the TaskManagers. + * The state backend checkpoints state directly to the JobManager's memory (hence the backend's name), + * but the checkpoints will be persisted to a file system for high-availability setups and savepoints. + * The MemoryStateBackend is consequently a FileSystem-based backend that can work without a + * file system dependency in simple setups. + * + * This state backend should be used only for experimentation, quick local setups, + * or for streaming applications that have very small state: Because it requires checkpoints to + * go through the JobManager's memory, larger state will occupy larger portions of the JobManager's + * main memory, reducing operational stability. + * For any other setup, the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend} + * should be used. The {@code FsStateBackend} holds the working state on the TaskManagers in the same way, but + * checkpoints state directly to files rather then to the JobManager's memory, thus supporting + * large state sizes. + * + * State Size Considerations + * + * State checkpointing with this state backend is subject to the following conditions: + * + * Each individual state must not exceed the configured maximum state size + * (see {@link #getMaxStateSize()}. + * + * All state from one task (i.e., the sum of all operator states and keyed states from all + * chained operators of the task) must not exceed what the RPC system supports, which is + * be default < 10 MB. That limit can be configured up, but that is typically not advised. + * + * The sum of all states in the application times all retained checkpoints must comfortably + * fit into the JobManager's JVM heap space. + * + * + * Persistence Guarantees + * + * For the use cases where the state sizes can be handled by this backend, the backend does guarantee + * persistence for savepoints, externalized checkpoints (of configured), and checkpoints + * (when high-availability is configured). + * + * Configuration + * + * As for all state backends, this backend can either be configured within the application (by creating + * the backend with the respective constructor parameters and setting it on the execution environment) + * or by specifying it in the Flink configuration. + * + * If the state backend was specified in the application, it may pick up additional configuration + * parameters from the Flink configuration. For example, if the backend if configured in the application + * without a default savepoint directory, it will pick up a default savepoint directory specified in the + * Flink configuration of the running job/cluster. That behavior is implemented via the + * {@link #configure(Configuration)} method. */ -public class MemoryStateBackend extends AbstractStateBackend {
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160685735 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class contains utility methods to load state backends from configurations. + */ +public class StateBackendLoader { + + // + // Configuration shortcut names + // + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // + // Loading the state backend from a configuration + // + + /** +* Loads the state backend from the configuration, from the parameter 'state.backend', as defined +* in {@link CheckpointingOptions#STATE_BACKEND}. +* +* The state backends can be specified either via their shortcut name, or via the class name +* of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory +* is instantiated (via its zero-argument constructor) and its +* {@link StateBackendFactory#createFromConfig(Configuration)} method is called. +* +* Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}', +* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and +* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'. +* +* @param config The configuration to load the state backend from +* @param classLoader The class loader that should be used to load the state backend +* @param logger Optionally, a logger to log actions to (may be null) +* +* @return The instantiated state backend. +* +* @throws DynamicCodeLoadingException +* Thrown if a state backend factory is configured and the factory class was not +* found or the factory could not be instantiated +* @throws IllegalConfigurationException +* May be thrown by the StateBackendFactory when creating / configuring the state +* backend in the factory +* @throws IOException +* May be thrown by the StateBackendFactory when instantiating the state backend +*/ +
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160646300 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.net.URI; + +/** + * A base class for all state backends that store their metadata (and data) in files. + * Examples that inherit from this are the {@link FsStateBackend}, the + * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend MemoryStateBackend}, or the + * {@code RocksDBStateBackend}. + * + * This class takes the base checkpoint- and savepoint directory paths, but also accepts null + * for both of then, in which case creating externalized checkpoint is not possible, and it is not + * possible to create a savepoint with a default path. Null is accepted to enable implementations + * that only optionally support default savepoints and externalized checkpoints. + * + * Checkpoint Layout + * + * The state backend is configured with a base directory and persists the checkpoint data of specific + * checkpoints in specific subdirectories. For example, if the base directory was set to + * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will create a subdirectory with + * the job's ID that will contain the actual checkpoints: + * ({@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b}) + * + * Each checkpoint individually will store all its files in a subdirectory that includes the + * checkpoint number, such as {@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}. + * + * Savepoint Layout + * + * A savepoint that is set to be stored in path {@code hdfs://namenode:port/flink-savepoints/}, will create + * a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it stores all savepoint data. + * The random digits are added as "entropy" to avoid directory collisions. + * + * Metadata and Success Files + * + * A completed checkpoint writes its metadata into a file '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'. + * After that is complete (i.e., the file complete), it writes an additional file + * '{@value AbstractFsCheckpointStorage#SUCCESS_FILE_NAME}'. + * + * Ideally that would not be necessary, and one would write the metadata file to a temporary file and + * then issue a atomic (or at least constant time) rename. But some of the file systems (like S3) do + * not support that: A rename is a copy process which, when failing, leaves corrupt half written + * files/objects. The success file is hence needed as a signal that the + * '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'file is complete. + */ +@PublicEvolving +public abstract class AbstractFileStateBackend extends AbstractStateBackend { + + private static final long serialVersionUID = 1L; + + // + // State Backend Properties + // + + /** The path where checkpoints will be stored, or null, if none has been configured. */ + @Nullable + private final Path baseCheckp
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160725400 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { --- End diff -- Another point about naming that I found confusing: Backends have a methods to create `CheckpointStorage` and `CheckpointStreamFactory`. From the names, it sounds like the storage could also do the job of the stream factories. In fact, the storage is dealing more with the metadata aspect on the checkpoint coordinator (which we call `Checkpoint`), and the factories are giving as streams that are used in checkpointing (so far, so good), but to write the snapshots on the task managers. Maybe the problem is rooted in that checkpoint is some metadata class for the JM, as well as some concept - and in the method current names both get mixed up. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160716512 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -1185,6 +1153,10 @@ public int getNumberOfRetainedSuccessfulCheckpoints() { } } + public CheckpointStorage getCheckpointStorage() { --- End diff -- Here I noticed how similar the names `CheckpointStorage` and `CompletedCheckpointStore` are. Do you think there are names that make their difference more obvious when reading the code? For example, the second could also go well under `CompletedCheckpoints`. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160721205 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); + + /** +* Checks whether the storage has a default savepoint location configured. +*/ + boolean hasDefaultSavepointLocation(); + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; + + /** +* Initializes a storage location for new checkpoint with the given ID. +* +* The returned storage location can be used to write the checkpoint data and metadata +* to and to obtain the pointers for the location(s) where the actual checkpoint data should be +* stored. +* +* @param checkpointId The ID (logical timestamp) of the checkpoint that should be persisted. +* @return A storage location for the data and metadata of the given checkpoint. +* +* @throws IOException Thrown if the storage location cannot be initialized due to an I/O exception. +*/ + CheckpointStorageLocation initializeCheckpoint(long checkpointId) throws IOException; --- End diff -- I would consider renaming this to something like `initializeStorageLocationForCheckpoint(...)` ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160727075 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -84,13 +84,44 @@ public interface StateBackend extends java.io.Serializable { // - // Persistent Bytes Storage + // Checkpoint storage - the durable persistence of checkpoint data + // + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; --- End diff -- Similar to the `String` vs `StoragePointer`, this could also be a class that, essentially , is a `StreamStateHandle` but gives a name to the concept, e.g. `CheckpointMetaDataHandle`. But it is not as bad as the other case, because this is never passed down very deep (yet). ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160727468 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); + + /** +* Checks whether the storage has a default savepoint location configured. +*/ + boolean hasDefaultSavepointLocation(); + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; + + /** +* Initializes a storage location for new checkpoint with the given ID. +* +* The returned storage location can be used to write the checkpoint data and metadata +* to and to obtain the pointers for the location(s) where the actual checkpoint data should be +* stored. +* +* @param checkpointId The ID (logical timestamp) of the checkpoint that should be persisted. +* @return A storage location for the data and metadata of the given checkpoint. +* +* @throws IOException Thrown if the storage location cannot be initialized due to an I/O exception. +*/ + CheckpointStorageLocation initializeCheckpoint(long checkpointId) throws IOException; + + /** +* Initializes a storage location for new savepoint with the given ID. +* +* If an external location pointer is passed, the savepoint storage location +* will be initialized at the location of that pointer. If the external location pointer is null, +* the default savepoint location will be used. If no default savepoint location is configured, +* this will throw an exception. Whether a default savepoint location is configured can be +* checked via {@link #hasDefaultSavepointLocation()}. +* +* @param checkpointId The ID (logical timestamp) of the savepoint's checkpoint. +* @param externalLocationPointer Optionally, a pointer to the location where the savepoint should +*be stored. May be null. +* +* @return A storage location for the data and metadata of the savepoint. +* +* @throws IOException Thrown if the storage location cannot be initialized due to an I/O exception. +*/ + CheckpointStorageLocation initializeSavepoint( --- End diff -- Same here about the name. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160644877 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -84,13 +84,44 @@ public interface StateBackend extends java.io.Serializable { // - // Persistent Bytes Storage + // Checkpoint storage - the durable persistence of checkpoint data --- End diff -- The interface `StateBackend` is getting more crowded and more functionality is added and all methods are visible to everybody from `CheckpointCoordinator` down to the `XYZKeyedStateBackend`. You have already divided the interface into 3 segments and I suggest to actually split it into 3 parent interfaces, which are all implemented by `StateBackend`. In many places, we can just pass by the interface of the sub-functionality that is relevant, e.g. `XYZKeyedStateBackend` probably only needs to see to part about creating streams and should never call things that create a new backend. This would be a big step for separation of concerns, to move away from "global visibility". Our IDE should be able to automatically figure out all the places where the sub-interfaces are sufficient, but as it might be a bigger diff, we could do a separate PR for this. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160731923 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); --- End diff -- The purpose of this did not become fully clear for me from the comments and the method is also never used. If this is part of some future work, you could consider adding the method in a future PR. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160722748 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -475,13 +474,12 @@ public void enableCheckpointing( checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, - externalizeSettings, + retentionPolicy, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, checkpointIDCounter, checkpointStore, --- End diff -- Here is one example where the proximity between the classnames `CompletedCheckpointStore` and `CheckpointStorage` becomes obvious. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160682007 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; + +/** + *An interface for state backends that pick up additional parameters from a configuration. --- End diff -- Whitespace missing at beginning of comment line. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160722027 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { --- End diff -- I wonder if we can find a name that makes the role of this class w.r.t `CheckpointStorageLocation` clearer, in particular that this is what assigns the storage locations to the individual checkpoints? ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160719709 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; + +import java.io.IOException; + +/** + * A storage location for one particular checkpoint. This location is typically + * created and initialized via {@link CheckpointStorage#initializeCheckpoint(long)} or + * {@link CheckpointStorage#initializeSavepoint(long, String)}. + */ +public interface CheckpointStorageLocation { + + /** +* Creates the output stream to persist the checkpoint metadata to. +* +* @return The output stream to persist the checkpoint metadata to. +* @throws IOException Thrown, if the stream cannot be opened due to an I/O error. +*/ + CheckpointStateOutputStream createMetadataOutputStream() throws IOException; + + /** +* Finalizes the checkpoint, marking the location as a finished checkpoint. +* This method returns the external checkpoint pointer that can be used to resolve +* the checkpoint upon recovery. +* +* @return The external pointer to the checkpoint at this location. +* @throws IOException Thrown, if finalizing / marking as finished fails due to an I/O error. +*/ + String markCheckpointAsFinished() throws IOException; --- End diff -- As discussed, you might reconsider if this method is really needed. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160730627 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointStorageLocation; + +import javax.annotation.Nullable; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * An implementation of durable checkpoint storage to file systems. + */ +public class FsCheckpointStorage extends AbstractFsCheckpointStorage { + + private final FileSystem fileSystem; + + private final Path checkpointsDirectory; + + private final Path sharedStateDirectory; + + private final Path taskOwnedStateDirectory; --- End diff -- As discussed, we might not need this if we map task-owned state to shared state. Idea: as long as the task still requires the state, it will resend the handle to this file (a placeholder to be precise) and keep the shared registry's reference count above zero. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5248 [FLINK-5823] [checkpoints] State backends now also handle the checkpoint metadata ## What is the purpose of the change This pull requests puts the State Backends in charge of persisting the metadata. This has the following advantages: - Checkpoints become conceptually independent of File Systems. We can in the future implement state backends purely backed by non-file systems like databases or message queues. - We can simplify or drop the extra code paths implemented for externalized checkpoints and checkpoint metadata persisting in HA cases. - The checkpoint and savepoint configurations go purely through the state backends, making testing much simpler. - Because the configuration go through the state backends only, it is simple to let the state backends pick up a mix of configuration from the application code (in code config) and configuration from the cluster setup. For example, a programmer can pick the state backend, and the cluster can have default limits or checkpoint directories configured. To support that, state backends may implement an additional interface which lets them pick up configuration values from the cluster configuration. - As a followup, this will allow us to implement more efficient ways of dropping checkpoint state (recursive directory delete) as well as logic to scavenge left-over checkpoint data. ## Altered user-facing Behavior - All checkpoints are always "externalized", meaning that the metadata is always persisted. The notion of externalized checkpoints is dropped. - Checkpoints have no "externalization setting", but a **retention policy**, like - `RETAIN_ON_CANCELLATION`: Keep checkpoints when user manually cancels job, similar as the corresponding setting for externalized checkpoints - `RETAIN_ON_FAILURE`: Retain when the job reaches a terminal failure. For compatibility, this is automatically picked when the user calls the now deprecated method to activate externalized checkpoints. - `NEVER_RETAIN_AFTER_TERMINATION`: Conceptually similar to the behavior when no externalized checkpoints were configured. - The `MemoryStateBackend` is viewed as a FileSystem-based State Backend that does not create separate files for state, but just holds state inline with the checkpoint metadata. In the Metadata and Savepoint handling, there is no distinction between the `MemoryStateBackend` and the `FsStateBackend`. - As a special case, the MemoryStateBackend may choose to not durably persist the metadata (when no storage location is configured, by default), in which case it will not be able to support an HA mode (there is an eager check for that). That is merely there to support no-config getting started experiences and simpler in-IDE development setups. ## Followup work To make sure there is no extra persisting of the checkpoint metadata by the HA store (it simply references the regular persisted checkpoint metadata) we need some changes to the `ZooKeeperCompletedCheckpointStore`. Once we start storing shared checkpoint state (incremental checkpoints) and task-owned state (write-ahead sinks) in different locations, we can start optimizing checkpoint directory cleanup, and can start implementing scavengers for left-over state. ## Brief change log - The state backends introduce the concept of a `CheckpointStorage` (storage of bytes) and `CheckpointStorageLocation` (specific location for the bytes of a checkpoint/savepoint). That makes the separation of concerns in the state backend clear: `KeyedStateBackend` and `OperatorStatebackend` define how to hold and checkpoint the state, while `CheckpointStorage` defines how to persist bytes (data and metadata). - The `CheckpointStorage` is responsible for storing the checkpoint metadata. There is no implicit assumption that the checkpoint metadata is stored in a file systems any more. - All checkpoint directory / savepoint directory specific config settings are now part of the state backends. The Checkpoint Coordinator simply calls the relevant methods on the state backends to store metadata. - All checkpoints are addressable via a "pointer", which is interpreted by the state backend to find the checkpoint. For File-system based statebackends (all statebackends in Flink currently), this pointer is the file path. ## Verifying this change This change adds and adjusts many existing tests to verify the behavior. Manual verification can happen by just starting a regular Flink cluster, enabling checkpoints, and seeing that metadata files get persisted always. ## Does this pull request potentially affect one of the following parts: -