Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]

2024-04-23 Thread via GitHub


PatrickRen merged PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]

2024-04-19 Thread via GitHub


PatrickRen commented on PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#issuecomment-2065990714

   @skymilong Thanks for the update! Could you rebase the latest master branch? 
I just merged a commit for fixing unstable CI tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]

2024-04-14 Thread via GitHub


skymilong commented on code in PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1565044902


##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/MemorySize.java:
##
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * MemorySize is a representation of a number of bytes, viewable in different 
units.
+ *
+ * Parsing
+ *
+ * The size can be parsed from a text expression. If the expression is a 
pure number, the value
+ * will be interpreted as bytes.
+ */
+@PublicEvolving
+public class MemorySize implements java.io.Serializable, 
Comparable {

Review Comment:
   > I think for `MemorySize` we should use the one the Flink instead of 
creating our own. It is a public API (marked as `@PublicEvolving`), and Flink 
CDC doesn't use this one in our production code. IIUC it is just used for 
parsing memory size expressions in configuration.
   
   I apologize for this oversight, and thank you for your patience.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]

2024-04-11 Thread via GitHub


PatrickRen commented on code in PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1561954297


##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/MemorySize.java:
##
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * MemorySize is a representation of a number of bytes, viewable in different 
units.
+ *
+ * Parsing
+ *
+ * The size can be parsed from a text expression. If the expression is a 
pure number, the value
+ * will be interpreted as bytes.
+ */
+@PublicEvolving
+public class MemorySize implements java.io.Serializable, 
Comparable {

Review Comment:
   I think for `MemorySize` we should use the one the Flink instead of creating 
our own. It is a public API (marked as `@PublicEvolving`), and Flink CDC 
doesn't use this one in our production code. IIUC it is just used for parsing 
memory size expressions in configuration. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]

2024-04-09 Thread via GitHub


PatrickRen commented on code in PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1557144006


##
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java:
##
@@ -37,15 +38,34 @@ public static Configuration loadMapFormattedConfig(Path 
configPath) throws Excep
 }
 ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
 try {
-Map configMap =
+Map configMap =
 mapper.readValue(
-configPath.toFile(), new TypeReference>() {});
-return Configuration.fromMap(configMap);
+configPath.toFile(), new TypeReference>() {});
+return Configuration.fromMap(flattenConfigMap(configMap));
 } catch (Exception e) {
 throw new IllegalStateException(
 String.format(
 "Failed to load config file \"%s\" to key-value 
pairs", configPath),
 e);
 }
 }
+
+private static Map flattenConfigMap(Map 
configMap) {
+Map result = new HashMap<>();
+flattenConfigMapHelper(configMap, "", result);
+return result;
+}
+
+private static void flattenConfigMapHelper(
+Map configMap, String currentPath, Map result) {
+for (Map.Entry entry : configMap.entrySet()) {
+String updatedPath =
+currentPath.isEmpty() ? entry.getKey() : currentPath + "." 
+ entry.getKey();
+if (entry.getValue() instanceof Map) {
+flattenConfigMapHelper((Map) entry.getValue(), 
updatedPath, result);
+} else {

Review Comment:
   `GlobalConfiguration` in Flink is marked as `@Internal`, so it's better we 
build our own wheel instead of referencing: 
   
   
https://github.com/apache/flink/blob/68cc61a86187021c61e7f51ccff8c5912125d013/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L43
   
   `@Internal` means there's no promise on compatibility. If we reference the 
function, I'm afraid there might be some compatibility and compiling issues 
with Flink in the future. The format of `flink-conf.yaml` and `config.yaml` are 
public APIs (because they are user-facing), so we can trust its stability. 
Therefore I prefer to rewrite the parsing logic on our side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]

2024-04-09 Thread via GitHub


skymilong commented on code in PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1557102504


##
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java:
##
@@ -37,15 +38,34 @@ public static Configuration loadMapFormattedConfig(Path 
configPath) throws Excep
 }
 ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
 try {
-Map configMap =
+Map configMap =
 mapper.readValue(
-configPath.toFile(), new TypeReference>() {});
-return Configuration.fromMap(configMap);
+configPath.toFile(), new TypeReference>() {});
+return Configuration.fromMap(flattenConfigMap(configMap));
 } catch (Exception e) {
 throw new IllegalStateException(
 String.format(
 "Failed to load config file \"%s\" to key-value 
pairs", configPath),
 e);
 }
 }
+
+private static Map flattenConfigMap(Map 
configMap) {
+Map result = new HashMap<>();
+flattenConfigMapHelper(configMap, "", result);
+return result;
+}
+
+private static void flattenConfigMapHelper(
+Map configMap, String currentPath, Map result) {
+for (Map.Entry entry : configMap.entrySet()) {
+String updatedPath =
+currentPath.isEmpty() ? entry.getKey() : currentPath + "." 
+ entry.getKey();
+if (entry.getValue() instanceof Map) {
+flattenConfigMapHelper((Map) entry.getValue(), 
updatedPath, result);
+} else {

Review Comment:
   > There should be a case for handling `List` type, according to 
https://github.com/apache/flink/blob/5bbcf8de79ce1979412879b919299ffa5a9b62fe/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L301-L307
   
   After reviewing this section of code and the previous PR 
https://github.com/apache/flink-cdc/pull/2681, I find myself a bit uncertain. 
Should we directly reference the relevant code from Flink, or should we copy 
over the YamlParserUtils#toYAMLString part of the code as done in PR 
https://github.com/apache/flink-cdc/pull/2681? I would greatly appreciate your 
advice on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]

2024-04-07 Thread via GitHub


PatrickRen commented on code in PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1554892220


##
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java:
##
@@ -20,18 +20,33 @@
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
 import java.nio.file.Path;
 import java.util.List;
 
 /** Utilities for handling Flink configuration and environment. */
 public class FlinkEnvironmentUtils {
 
+private static final Logger LOG = 
LoggerFactory.getLogger(FlinkEnvironmentUtils.class);
 private static final String FLINK_CONF_DIR = "conf";
-private static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
+private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml";
+private static final String FLINK_CONF_FILENAME = "config.yaml";
 
 public static Configuration loadFlinkConfiguration(Path flinkHome) throws 
Exception {
 Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
-return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);
+try {
+return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);

Review Comment:
   What about renaming the method to `ConfigurationUtils#loadConfigFile`? This 
method doesn't process map-formatted config (`flink-conf.yaml`) only.



##
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java:
##
@@ -37,15 +38,34 @@ public static Configuration loadMapFormattedConfig(Path 
configPath) throws Excep
 }
 ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
 try {
-Map configMap =
+Map configMap =
 mapper.readValue(
-configPath.toFile(), new TypeReference>() {});
-return Configuration.fromMap(configMap);
+configPath.toFile(), new TypeReference>() {});
+return Configuration.fromMap(flattenConfigMap(configMap));
 } catch (Exception e) {
 throw new IllegalStateException(
 String.format(
 "Failed to load config file \"%s\" to key-value 
pairs", configPath),
 e);
 }
 }
+
+private static Map flattenConfigMap(Map 
configMap) {
+Map result = new HashMap<>();
+flattenConfigMapHelper(configMap, "", result);
+return result;
+}
+
+private static void flattenConfigMapHelper(
+Map configMap, String currentPath, Map result) {
+for (Map.Entry entry : configMap.entrySet()) {
+String updatedPath =
+currentPath.isEmpty() ? entry.getKey() : currentPath + "." 
+ entry.getKey();
+if (entry.getValue() instanceof Map) {
+flattenConfigMapHelper((Map) entry.getValue(), 
updatedPath, result);
+} else {

Review Comment:
   There should be a case for handling `List` type, according to 
https://github.com/apache/flink/blob/5bbcf8de79ce1979412879b919299ffa5a9b62fe/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L301-L307



##
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java:
##
@@ -37,15 +38,34 @@ public static Configuration loadMapFormattedConfig(Path 
configPath) throws Excep
 }
 ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
 try {
-Map configMap =
+Map configMap =
 mapper.readValue(
-configPath.toFile(), new TypeReference>() {});
-return Configuration.fromMap(configMap);
+configPath.toFile(), new TypeReference>() {});
+return Configuration.fromMap(flattenConfigMap(configMap));
 } catch (Exception e) {
 throw new IllegalStateException(
 String.format(
 "Failed to load config file \"%s\" to key-value 
pairs", configPath),
 e);
 }
 }
+
+private static Map flattenConfigMap(Map 
configMap) {
+Map result = new HashMap<>();
+flattenConfigMapHelper(configMap, "", result);
+return result;
+}

Review Comment:
   This looks like just a very simple wrapper around `flattenConfigMapHelper`. 
We can just merge the logic into `flattenConfigMap` instead of having another 
helper. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:

Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]

2024-04-06 Thread via GitHub


skymilong commented on code in PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1552023129


##
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java:
##
@@ -20,17 +20,24 @@
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
 
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
 
 /** Utilities for handling Flink configuration and environment. */
 public class FlinkEnvironmentUtils {
 
 private static final String FLINK_CONF_DIR = "conf";
-private static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
+private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml";
+private static final String NEW_FLINK_CONF_FILENAME = "config.yaml";
 
 public static Configuration loadFlinkConfiguration(Path flinkHome) throws 
Exception {
-Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
+Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(OLD_FLINK_CONF_FILENAME);
+// If the old version of the configuration file does not exist, then 
attempt to use the new
+// version of the file name.
+if (!Files.exists(flinkConfPath)) {
+flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(NEW_FLINK_CONF_FILENAME);
+}
 return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);

Review Comment:
   Thank you for your feedback. I have rechecked the code and I agree with you. 
The new config.yaml is not in a map format anymore, so rewriting the parsing 
logic would be necessary. I am willing to take on this task and submit the 
necessary changes.
   
   In addition, I think it might be beneficial to handle the old version of the 
configuration file through exception handling. Here is a possible way to do so:
   
   ```java
   private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml";
   private static final String FLINK_CONF_FILENAME = "config.yaml";
   public static Configuration loadFlinkConfiguration(Path flinkHome) throws 
Exception {
   Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
   try {
   return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);
   } catch (FileNotFoundException e) {
   LOG.warn(
   "Failed to load the new configuration file:{}. Trying to 
load the old configuration file:{}.",
   FLINK_CONF_FILENAME,
   OLD_FLINK_CONF_FILENAME);
   return ConfigurationUtils.loadMapFormattedConfig(
   
flinkHome.resolve(FLINK_CONF_DIR).resolve(OLD_FLINK_CONF_FILENAME));
   }
   }
   ```
   This is my idea for exception handling. We first attempt to load the new 
configuration file. If an exception occurs (which could be due to the absence 
of the new configuration file), we then try to load the old configuration file. 
This approach not only ensures backward compatibility and allows for a smoother 
transition between different versions of Flink, but also results in two log 
notifications from the `loadMapFormattedConfig` method. I think this will make 
the notifications more user-friendly. Do you think this approach is feasible? 
Or do you have any better suggestions or ideas? I look forward to your feedback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]

2024-04-04 Thread via GitHub


skymilong commented on code in PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1552023129


##
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java:
##
@@ -20,17 +20,24 @@
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
 
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
 
 /** Utilities for handling Flink configuration and environment. */
 public class FlinkEnvironmentUtils {
 
 private static final String FLINK_CONF_DIR = "conf";
-private static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
+private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml";
+private static final String NEW_FLINK_CONF_FILENAME = "config.yaml";
 
 public static Configuration loadFlinkConfiguration(Path flinkHome) throws 
Exception {
-Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
+Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(OLD_FLINK_CONF_FILENAME);
+// If the old version of the configuration file does not exist, then 
attempt to use the new
+// version of the file name.
+if (!Files.exists(flinkConfPath)) {
+flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(NEW_FLINK_CONF_FILENAME);
+}
 return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);

Review Comment:
   Thank you for your feedback. I have rechecked the code and I agree with you. 
The new config.yaml is not in a map format anymore, so rewriting the parsing 
logic would be necessary. I am willing to take on this task and submit the 
necessary changes.
   
   In addition, I think it might be beneficial to handle the old version of the 
configuration file through exception handling. Here is a possible way to do so:
   
   ```java
   private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml";
   private static final String FLINK_CONF_FILENAME = "config.yaml";
   public static Configuration loadFlinkConfiguration(Path flinkHome) throws 
Exception {
   Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
   try {
   return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);
   } catch (Exception e) {
   LOG.warn(
   "Failed to load the new configuration file:{}. Trying to 
load the old configuration file:{}.",
   FLINK_CONF_FILENAME,
   OLD_FLINK_CONF_FILENAME);
   return ConfigurationUtils.loadMapFormattedConfig(
   
flinkHome.resolve(FLINK_CONF_DIR).resolve(OLD_FLINK_CONF_FILENAME));
   }
   }
   ```
   This is my idea for exception handling. We first attempt to load the new 
configuration file. If an exception occurs (which could be due to the absence 
of the new configuration file), we then try to load the old configuration file. 
This approach not only ensures backward compatibility and allows for a smoother 
transition between different versions of Flink, but also results in two log 
notifications from the `loadMapFormattedConfig` method. I think this will make 
the notifications more user-friendly. Do you think this approach is feasible? 
Or do you have any better suggestions or ideas? I look forward to your feedback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]

2024-04-03 Thread via GitHub


PatrickRen commented on code in PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1549116999


##
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java:
##
@@ -20,17 +20,24 @@
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
 
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
 
 /** Utilities for handling Flink configuration and environment. */
 public class FlinkEnvironmentUtils {
 
 private static final String FLINK_CONF_DIR = "conf";
-private static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
+private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml";
+private static final String NEW_FLINK_CONF_FILENAME = "config.yaml";
 
 public static Configuration loadFlinkConfiguration(Path flinkHome) throws 
Exception {
-Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
+Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(OLD_FLINK_CONF_FILENAME);
+// If the old version of the configuration file does not exist, then 
attempt to use the new
+// version of the file name.
+if (!Files.exists(flinkConfPath)) {
+flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(NEW_FLINK_CONF_FILENAME);
+}
 return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);

Review Comment:
   As the new `config.yaml` is not map-formatted anymore, I think we need to 
rewrite the parsing logic here. `ConfigurationUtils.loadMapFormattedConfig` 
assumes all configs are in flatten key:value format.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org