This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 139db3f4bc7faed4478393a91a063ad54d15a523 Author: JunRuiLee <jrlee....@gmail.com> AuthorDate: Mon Nov 20 16:58:04 2023 +0800 [FLINK-33581][core] Deprecate getter/setter methods related to checkpoint storage in the CheckpointConfig. --- .../examples/statemachine/StateMachineExample.java | 14 +++-- .../api/environment/CheckpointConfig.java | 68 ++++++++++++++++++++++ 2 files changed, 78 insertions(+), 4 deletions(-) diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java index 82ec3674178..f905043b2ee 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java @@ -27,6 +27,8 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.connector.datagen.source.GeneratorFunction; @@ -82,20 +84,24 @@ public class StateMachineExample { final ParameterTool params = ParameterTool.fromArgs(args); // create the environment to create streams and configure execution - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(2000L); + Configuration configuration = new Configuration(); final String stateBackend = params.get("backend", "memory"); if ("hashmap".equals(stateBackend)) { final String checkpointDir = params.get("checkpoint-dir"); env.setStateBackend(new HashMapStateBackend()); - env.getCheckpointConfig().setCheckpointStorage(checkpointDir); + configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); + configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); } else if ("rocks".equals(stateBackend)) { final String checkpointDir = params.get("checkpoint-dir"); boolean incrementalCheckpoints = params.getBoolean("incremental-checkpoints", false); env.setStateBackend(new EmbeddedRocksDBStateBackend(incrementalCheckpoints)); - env.getCheckpointConfig().setCheckpointStorage(checkpointDir); + configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); + configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); } + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); + env.enableCheckpointing(2000L); if (params.has("kafka-topic")) { // set up the Kafka reader diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index b80ed10bb73..cf299080571 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -747,8 +747,23 @@ public class CheckpointConfig implements java.io.Serializable { * terabytes while providing a highly available foundation for stateful applications. This * checkpoint storage policy is recommended for most production deployments. * + * @deprecated The method is marked as deprecated because starting from Flink 1.19, the usage of + * all complex Java objects related to configuration, including their getter and setter + * methods, should be replaced by ConfigOption. In a future major version of Flink, this + * method will be removed entirely. It is recommended to switch to using the ConfigOptions + * provided for configuring checkpoint storage like the following code snippet: + * <pre>{@code + * Configuration config = new Configuration(); + * config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); + * config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///flink/checkpoints"); + * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + * }</pre> + * For more details on using ConfigOption for checkpoint storage configuration, please refer + * to the Flink documentation: <a + * href="https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/checkpoints">Checkpoints</a> * @param storage The checkpoint storage policy. */ + @Deprecated @PublicEvolving public void setCheckpointStorage(CheckpointStorage storage) { Preconditions.checkNotNull(storage, "Checkpoint storage must not be null"); @@ -759,9 +774,24 @@ public class CheckpointConfig implements java.io.Serializable { * Configures the application to write out checkpoint snapshots to the configured directory. See * {@link FileSystemCheckpointStorage} for more details on checkpointing to a file system. * + * @deprecated The method is marked as deprecated because starting from Flink 1.19, the usage of + * all complex Java objects related to configuration, including their getter and setter + * methods, should be replaced by ConfigOption. In a future major version of Flink, this + * method will be removed entirely. It is recommended to switch to using the ConfigOptions + * provided for configuring checkpoint storage like the following code snippet: + * <pre>{@code + * Configuration config = new Configuration(); + * config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); + * config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///flink/checkpoints"); + * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + * }</pre> + * For more details on using ConfigOption for checkpoint storage configuration, please refer + * to the Flink documentation: <a + * href="https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/checkpoints">Checkpoints</a> * @param checkpointDirectory The path to write checkpoint metadata to. * @see #setCheckpointStorage(CheckpointStorage) */ + @Deprecated @PublicEvolving public void setCheckpointStorage(String checkpointDirectory) { Preconditions.checkNotNull(checkpointDirectory, "Checkpoint directory must not be null"); @@ -772,9 +802,24 @@ public class CheckpointConfig implements java.io.Serializable { * Configures the application to write out checkpoint snapshots to the configured directory. See * {@link FileSystemCheckpointStorage} for more details on checkpointing to a file system. * + * @deprecated The method is marked as deprecated because starting from Flink 1.19, the usage of + * all complex Java objects related to configuration, including their getter and setter + * methods, should be replaced by ConfigOption. In a future major version of Flink, this + * method will be removed entirely. It is recommended to switch to using the ConfigOptions + * provided for configuring checkpoint storage like the following code snippet: + * <pre>{@code + * Configuration config = new Configuration(); + * config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); + * config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///flink/checkpoints"); + * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + * }</pre> + * For more details on using ConfigOption for checkpoint storage configuration, please refer + * to the Flink documentation: <a + * href="https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/checkpoints">Checkpoints</a> * @param checkpointDirectory The path to write checkpoint metadata to. * @see #setCheckpointStorage(CheckpointStorage) */ + @Deprecated @PublicEvolving public void setCheckpointStorage(URI checkpointDirectory) { Preconditions.checkNotNull(checkpointDirectory, "Checkpoint directory must not be null"); @@ -785,9 +830,24 @@ public class CheckpointConfig implements java.io.Serializable { * Configures the application to write out checkpoint snapshots to the configured directory. See * {@link FileSystemCheckpointStorage} for more details on checkpointing to a file system. * + * @deprecated The method is marked as deprecated because starting from Flink 1.19, the usage of + * all complex Java objects related to configuration, including their getter and setter + * methods, should be replaced by ConfigOption. In a future major version of Flink, this + * method will be removed entirely. It is recommended to switch to using the ConfigOptions + * provided for configuring checkpoint storage like the following code snippet: + * <pre>{@code + * Configuration config = new Configuration(); + * config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); + * config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///flink/checkpoints"); + * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + * }</pre> + * For more details on using ConfigOption for checkpoint storage configuration, please refer + * to the Flink documentation: <a + * href="https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/checkpoints">Checkpoints</a> * @param checkpointDirectory The path to write checkpoint metadata to. * @see #setCheckpointStorage(String) */ + @Deprecated @PublicEvolving public void setCheckpointStorage(Path checkpointDirectory) { Preconditions.checkNotNull(checkpointDirectory, "Checkpoint directory must not be null"); @@ -795,10 +855,18 @@ public class CheckpointConfig implements java.io.Serializable { } /** + * @deprecated The method is marked as deprecated because starting from Flink 1.19, the usage of + * all complex Java objects related to configuration, including their getter and setter + * methods, should be replaced by ConfigOption. In a future major version of Flink, this + * method will be removed entirely. It is recommended to find which checkpoint storage is + * used by checkpoint storage ConfigOption. For more details on using ConfigOption for + * checkpoint storage configuration, please refer to the Flink documentation: <a + * href="https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/checkpoints">Checkpoints</a> * @return The {@link CheckpointStorage} that has been configured for the job. Or {@code null} * if none has been set. * @see #setCheckpointStorage(CheckpointStorage) */ + @Deprecated @Nullable @PublicEvolving public CheckpointStorage getCheckpointStorage() {