This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 139db3f4bc7faed4478393a91a063ad54d15a523
Author: JunRuiLee <jrlee....@gmail.com>
AuthorDate: Mon Nov 20 16:58:04 2023 +0800

    [FLINK-33581][core] Deprecate getter/setter methods related to checkpoint 
storage in the CheckpointConfig.
---
 .../examples/statemachine/StateMachineExample.java | 14 +++--
 .../api/environment/CheckpointConfig.java          | 68 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 4 deletions(-)

diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
index 82ec3674178..f905043b2ee 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.connector.datagen.source.DataGeneratorSource;
 import org.apache.flink.connector.datagen.source.GeneratorFunction;
@@ -82,20 +84,24 @@ public class StateMachineExample {
         final ParameterTool params = ParameterTool.fromArgs(args);
 
         // create the environment to create streams and configure execution
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.enableCheckpointing(2000L);
+        Configuration configuration = new Configuration();
 
         final String stateBackend = params.get("backend", "memory");
         if ("hashmap".equals(stateBackend)) {
             final String checkpointDir = params.get("checkpoint-dir");
             env.setStateBackend(new HashMapStateBackend());
-            env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
+            configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, 
"filesystem");
+            configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir);
         } else if ("rocks".equals(stateBackend)) {
             final String checkpointDir = params.get("checkpoint-dir");
             boolean incrementalCheckpoints = 
params.getBoolean("incremental-checkpoints", false);
             env.setStateBackend(new 
EmbeddedRocksDBStateBackend(incrementalCheckpoints));
-            env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
+            configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, 
"filesystem");
+            configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir);
         }
+        final StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+        env.enableCheckpointing(2000L);
 
         if (params.has("kafka-topic")) {
             // set up the Kafka reader
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index b80ed10bb73..cf299080571 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -747,8 +747,23 @@ public class CheckpointConfig implements 
java.io.Serializable {
      * terabytes while providing a highly available foundation for stateful 
applications. This
      * checkpoint storage policy is recommended for most production 
deployments.
      *
+     * @deprecated The method is marked as deprecated because starting from 
Flink 1.19, the usage of
+     *     all complex Java objects related to configuration, including their 
getter and setter
+     *     methods, should be replaced by ConfigOption. In a future major 
version of Flink, this
+     *     method will be removed entirely. It is recommended to switch to 
using the ConfigOptions
+     *     provided for configuring checkpoint storage like the following code 
snippet:
+     *     <pre>{@code
+     * Configuration config = new Configuration();
+     * config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+     * config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
"file:///flink/checkpoints");
+     * StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
+     * }</pre>
+     *     For more details on using ConfigOption for checkpoint storage 
configuration, please refer
+     *     to the Flink documentation: <a
+     *     
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/checkpoints";>Checkpoints</a>
      * @param storage The checkpoint storage policy.
      */
+    @Deprecated
     @PublicEvolving
     public void setCheckpointStorage(CheckpointStorage storage) {
         Preconditions.checkNotNull(storage, "Checkpoint storage must not be 
null");
@@ -759,9 +774,24 @@ public class CheckpointConfig implements 
java.io.Serializable {
      * Configures the application to write out checkpoint snapshots to the 
configured directory. See
      * {@link FileSystemCheckpointStorage} for more details on checkpointing 
to a file system.
      *
+     * @deprecated The method is marked as deprecated because starting from 
Flink 1.19, the usage of
+     *     all complex Java objects related to configuration, including their 
getter and setter
+     *     methods, should be replaced by ConfigOption. In a future major 
version of Flink, this
+     *     method will be removed entirely. It is recommended to switch to 
using the ConfigOptions
+     *     provided for configuring checkpoint storage like the following code 
snippet:
+     *     <pre>{@code
+     * Configuration config = new Configuration();
+     * config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+     * config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
"file:///flink/checkpoints");
+     * StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
+     * }</pre>
+     *     For more details on using ConfigOption for checkpoint storage 
configuration, please refer
+     *     to the Flink documentation: <a
+     *     
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/checkpoints";>Checkpoints</a>
      * @param checkpointDirectory The path to write checkpoint metadata to.
      * @see #setCheckpointStorage(CheckpointStorage)
      */
+    @Deprecated
     @PublicEvolving
     public void setCheckpointStorage(String checkpointDirectory) {
         Preconditions.checkNotNull(checkpointDirectory, "Checkpoint directory 
must not be null");
@@ -772,9 +802,24 @@ public class CheckpointConfig implements 
java.io.Serializable {
      * Configures the application to write out checkpoint snapshots to the 
configured directory. See
      * {@link FileSystemCheckpointStorage} for more details on checkpointing 
to a file system.
      *
+     * @deprecated The method is marked as deprecated because starting from 
Flink 1.19, the usage of
+     *     all complex Java objects related to configuration, including their 
getter and setter
+     *     methods, should be replaced by ConfigOption. In a future major 
version of Flink, this
+     *     method will be removed entirely. It is recommended to switch to 
using the ConfigOptions
+     *     provided for configuring checkpoint storage like the following code 
snippet:
+     *     <pre>{@code
+     * Configuration config = new Configuration();
+     * config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+     * config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
"file:///flink/checkpoints");
+     * StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
+     * }</pre>
+     *     For more details on using ConfigOption for checkpoint storage 
configuration, please refer
+     *     to the Flink documentation: <a
+     *     
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/checkpoints";>Checkpoints</a>
      * @param checkpointDirectory The path to write checkpoint metadata to.
      * @see #setCheckpointStorage(CheckpointStorage)
      */
+    @Deprecated
     @PublicEvolving
     public void setCheckpointStorage(URI checkpointDirectory) {
         Preconditions.checkNotNull(checkpointDirectory, "Checkpoint directory 
must not be null");
@@ -785,9 +830,24 @@ public class CheckpointConfig implements 
java.io.Serializable {
      * Configures the application to write out checkpoint snapshots to the 
configured directory. See
      * {@link FileSystemCheckpointStorage} for more details on checkpointing 
to a file system.
      *
+     * @deprecated The method is marked as deprecated because starting from 
Flink 1.19, the usage of
+     *     all complex Java objects related to configuration, including their 
getter and setter
+     *     methods, should be replaced by ConfigOption. In a future major 
version of Flink, this
+     *     method will be removed entirely. It is recommended to switch to 
using the ConfigOptions
+     *     provided for configuring checkpoint storage like the following code 
snippet:
+     *     <pre>{@code
+     * Configuration config = new Configuration();
+     * config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+     * config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
"file:///flink/checkpoints");
+     * StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
+     * }</pre>
+     *     For more details on using ConfigOption for checkpoint storage 
configuration, please refer
+     *     to the Flink documentation: <a
+     *     
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/checkpoints";>Checkpoints</a>
      * @param checkpointDirectory The path to write checkpoint metadata to.
      * @see #setCheckpointStorage(String)
      */
+    @Deprecated
     @PublicEvolving
     public void setCheckpointStorage(Path checkpointDirectory) {
         Preconditions.checkNotNull(checkpointDirectory, "Checkpoint directory 
must not be null");
@@ -795,10 +855,18 @@ public class CheckpointConfig implements 
java.io.Serializable {
     }
 
     /**
+     * @deprecated The method is marked as deprecated because starting from 
Flink 1.19, the usage of
+     *     all complex Java objects related to configuration, including their 
getter and setter
+     *     methods, should be replaced by ConfigOption. In a future major 
version of Flink, this
+     *     method will be removed entirely. It is recommended to find which 
checkpoint storage is
+     *     used by checkpoint storage ConfigOption. For more details on using 
ConfigOption for
+     *     checkpoint storage configuration, please refer to the Flink 
documentation: <a
+     *     
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/checkpoints";>Checkpoints</a>
      * @return The {@link CheckpointStorage} that has been configured for the 
job. Or {@code null}
      *     if none has been set.
      * @see #setCheckpointStorage(CheckpointStorage)
      */
+    @Deprecated
     @Nullable
     @PublicEvolving
     public CheckpointStorage getCheckpointStorage() {

Reply via email to