Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]
PatrickRen merged PR #3194: URL: https://github.com/apache/flink-cdc/pull/3194 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]
PatrickRen commented on PR #3194: URL: https://github.com/apache/flink-cdc/pull/3194#issuecomment-2065990714 @skymilong Thanks for the update! Could you rebase the latest master branch? I just merged a commit for fixing unstable CI tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]
skymilong commented on code in PR #3194: URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1565044902 ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/MemorySize.java: ## @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.utils; + +import org.apache.flink.annotation.PublicEvolving; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * MemorySize is a representation of a number of bytes, viewable in different units. + * + * Parsing + * + * The size can be parsed from a text expression. If the expression is a pure number, the value + * will be interpreted as bytes. + */ +@PublicEvolving +public class MemorySize implements java.io.Serializable, Comparable { Review Comment: > I think for `MemorySize` we should use the one the Flink instead of creating our own. It is a public API (marked as `@PublicEvolving`), and Flink CDC doesn't use this one in our production code. IIUC it is just used for parsing memory size expressions in configuration. I apologize for this oversight, and thank you for your patience. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]
PatrickRen commented on code in PR #3194: URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1561954297 ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/MemorySize.java: ## @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.utils; + +import org.apache.flink.annotation.PublicEvolving; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * MemorySize is a representation of a number of bytes, viewable in different units. + * + * Parsing + * + * The size can be parsed from a text expression. If the expression is a pure number, the value + * will be interpreted as bytes. + */ +@PublicEvolving +public class MemorySize implements java.io.Serializable, Comparable { Review Comment: I think for `MemorySize` we should use the one the Flink instead of creating our own. It is a public API (marked as `@PublicEvolving`), and Flink CDC doesn't use this one in our production code. IIUC it is just used for parsing memory size expressions in configuration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]
PatrickRen commented on code in PR #3194: URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1557144006 ## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java: ## @@ -37,15 +38,34 @@ public static Configuration loadMapFormattedConfig(Path configPath) throws Excep } ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); try { -Map configMap = +Map configMap = mapper.readValue( -configPath.toFile(), new TypeReference>() {}); -return Configuration.fromMap(configMap); +configPath.toFile(), new TypeReference>() {}); +return Configuration.fromMap(flattenConfigMap(configMap)); } catch (Exception e) { throw new IllegalStateException( String.format( "Failed to load config file \"%s\" to key-value pairs", configPath), e); } } + +private static Map flattenConfigMap(Map configMap) { +Map result = new HashMap<>(); +flattenConfigMapHelper(configMap, "", result); +return result; +} + +private static void flattenConfigMapHelper( +Map configMap, String currentPath, Map result) { +for (Map.Entry entry : configMap.entrySet()) { +String updatedPath = +currentPath.isEmpty() ? entry.getKey() : currentPath + "." + entry.getKey(); +if (entry.getValue() instanceof Map) { +flattenConfigMapHelper((Map) entry.getValue(), updatedPath, result); +} else { Review Comment: `GlobalConfiguration` in Flink is marked as `@Internal`, so it's better we build our own wheel instead of referencing: https://github.com/apache/flink/blob/68cc61a86187021c61e7f51ccff8c5912125d013/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L43 `@Internal` means there's no promise on compatibility. If we reference the function, I'm afraid there might be some compatibility and compiling issues with Flink in the future. The format of `flink-conf.yaml` and `config.yaml` are public APIs (because they are user-facing), so we can trust its stability. Therefore I prefer to rewrite the parsing logic on our side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]
skymilong commented on code in PR #3194: URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1557102504 ## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java: ## @@ -37,15 +38,34 @@ public static Configuration loadMapFormattedConfig(Path configPath) throws Excep } ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); try { -Map configMap = +Map configMap = mapper.readValue( -configPath.toFile(), new TypeReference>() {}); -return Configuration.fromMap(configMap); +configPath.toFile(), new TypeReference>() {}); +return Configuration.fromMap(flattenConfigMap(configMap)); } catch (Exception e) { throw new IllegalStateException( String.format( "Failed to load config file \"%s\" to key-value pairs", configPath), e); } } + +private static Map flattenConfigMap(Map configMap) { +Map result = new HashMap<>(); +flattenConfigMapHelper(configMap, "", result); +return result; +} + +private static void flattenConfigMapHelper( +Map configMap, String currentPath, Map result) { +for (Map.Entry entry : configMap.entrySet()) { +String updatedPath = +currentPath.isEmpty() ? entry.getKey() : currentPath + "." + entry.getKey(); +if (entry.getValue() instanceof Map) { +flattenConfigMapHelper((Map) entry.getValue(), updatedPath, result); +} else { Review Comment: > There should be a case for handling `List` type, according to https://github.com/apache/flink/blob/5bbcf8de79ce1979412879b919299ffa5a9b62fe/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L301-L307 After reviewing this section of code and the previous PR https://github.com/apache/flink-cdc/pull/2681, I find myself a bit uncertain. Should we directly reference the relevant code from Flink, or should we copy over the YamlParserUtils#toYAMLString part of the code as done in PR https://github.com/apache/flink-cdc/pull/2681? I would greatly appreciate your advice on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]
PatrickRen commented on code in PR #3194: URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1554892220 ## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java: ## @@ -20,18 +20,33 @@ import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; import java.nio.file.Path; import java.util.List; /** Utilities for handling Flink configuration and environment. */ public class FlinkEnvironmentUtils { +private static final Logger LOG = LoggerFactory.getLogger(FlinkEnvironmentUtils.class); private static final String FLINK_CONF_DIR = "conf"; -private static final String FLINK_CONF_FILENAME = "flink-conf.yaml"; +private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml"; +private static final String FLINK_CONF_FILENAME = "config.yaml"; public static Configuration loadFlinkConfiguration(Path flinkHome) throws Exception { Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME); -return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath); +try { +return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath); Review Comment: What about renaming the method to `ConfigurationUtils#loadConfigFile`? This method doesn't process map-formatted config (`flink-conf.yaml`) only. ## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java: ## @@ -37,15 +38,34 @@ public static Configuration loadMapFormattedConfig(Path configPath) throws Excep } ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); try { -Map configMap = +Map configMap = mapper.readValue( -configPath.toFile(), new TypeReference>() {}); -return Configuration.fromMap(configMap); +configPath.toFile(), new TypeReference>() {}); +return Configuration.fromMap(flattenConfigMap(configMap)); } catch (Exception e) { throw new IllegalStateException( String.format( "Failed to load config file \"%s\" to key-value pairs", configPath), e); } } + +private static Map flattenConfigMap(Map configMap) { +Map result = new HashMap<>(); +flattenConfigMapHelper(configMap, "", result); +return result; +} + +private static void flattenConfigMapHelper( +Map configMap, String currentPath, Map result) { +for (Map.Entry entry : configMap.entrySet()) { +String updatedPath = +currentPath.isEmpty() ? entry.getKey() : currentPath + "." + entry.getKey(); +if (entry.getValue() instanceof Map) { +flattenConfigMapHelper((Map) entry.getValue(), updatedPath, result); +} else { Review Comment: There should be a case for handling `List` type, according to https://github.com/apache/flink/blob/5bbcf8de79ce1979412879b919299ffa5a9b62fe/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L301-L307 ## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java: ## @@ -37,15 +38,34 @@ public static Configuration loadMapFormattedConfig(Path configPath) throws Excep } ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); try { -Map configMap = +Map configMap = mapper.readValue( -configPath.toFile(), new TypeReference>() {}); -return Configuration.fromMap(configMap); +configPath.toFile(), new TypeReference>() {}); +return Configuration.fromMap(flattenConfigMap(configMap)); } catch (Exception e) { throw new IllegalStateException( String.format( "Failed to load config file \"%s\" to key-value pairs", configPath), e); } } + +private static Map flattenConfigMap(Map configMap) { +Map result = new HashMap<>(); +flattenConfigMapHelper(configMap, "", result); +return result; +} Review Comment: This looks like just a very simple wrapper around `flattenConfigMapHelper`. We can just merge the logic into `flattenConfigMap` instead of having another helper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at:
Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]
skymilong commented on code in PR #3194: URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1552023129 ## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java: ## @@ -20,17 +20,24 @@ import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import java.nio.file.Files; import java.nio.file.Path; import java.util.List; /** Utilities for handling Flink configuration and environment. */ public class FlinkEnvironmentUtils { private static final String FLINK_CONF_DIR = "conf"; -private static final String FLINK_CONF_FILENAME = "flink-conf.yaml"; +private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml"; +private static final String NEW_FLINK_CONF_FILENAME = "config.yaml"; public static Configuration loadFlinkConfiguration(Path flinkHome) throws Exception { -Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME); +Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(OLD_FLINK_CONF_FILENAME); +// If the old version of the configuration file does not exist, then attempt to use the new +// version of the file name. +if (!Files.exists(flinkConfPath)) { +flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(NEW_FLINK_CONF_FILENAME); +} return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath); Review Comment: Thank you for your feedback. I have rechecked the code and I agree with you. The new config.yaml is not in a map format anymore, so rewriting the parsing logic would be necessary. I am willing to take on this task and submit the necessary changes. In addition, I think it might be beneficial to handle the old version of the configuration file through exception handling. Here is a possible way to do so: ```java private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml"; private static final String FLINK_CONF_FILENAME = "config.yaml"; public static Configuration loadFlinkConfiguration(Path flinkHome) throws Exception { Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME); try { return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath); } catch (FileNotFoundException e) { LOG.warn( "Failed to load the new configuration file:{}. Trying to load the old configuration file:{}.", FLINK_CONF_FILENAME, OLD_FLINK_CONF_FILENAME); return ConfigurationUtils.loadMapFormattedConfig( flinkHome.resolve(FLINK_CONF_DIR).resolve(OLD_FLINK_CONF_FILENAME)); } } ``` This is my idea for exception handling. We first attempt to load the new configuration file. If an exception occurs (which could be due to the absence of the new configuration file), we then try to load the old configuration file. This approach not only ensures backward compatibility and allows for a smoother transition between different versions of Flink, but also results in two log notifications from the `loadMapFormattedConfig` method. I think this will make the notifications more user-friendly. Do you think this approach is feasible? Or do you have any better suggestions or ideas? I look forward to your feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]
skymilong commented on code in PR #3194: URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1552023129 ## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java: ## @@ -20,17 +20,24 @@ import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import java.nio.file.Files; import java.nio.file.Path; import java.util.List; /** Utilities for handling Flink configuration and environment. */ public class FlinkEnvironmentUtils { private static final String FLINK_CONF_DIR = "conf"; -private static final String FLINK_CONF_FILENAME = "flink-conf.yaml"; +private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml"; +private static final String NEW_FLINK_CONF_FILENAME = "config.yaml"; public static Configuration loadFlinkConfiguration(Path flinkHome) throws Exception { -Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME); +Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(OLD_FLINK_CONF_FILENAME); +// If the old version of the configuration file does not exist, then attempt to use the new +// version of the file name. +if (!Files.exists(flinkConfPath)) { +flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(NEW_FLINK_CONF_FILENAME); +} return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath); Review Comment: Thank you for your feedback. I have rechecked the code and I agree with you. The new config.yaml is not in a map format anymore, so rewriting the parsing logic would be necessary. I am willing to take on this task and submit the necessary changes. In addition, I think it might be beneficial to handle the old version of the configuration file through exception handling. Here is a possible way to do so: ```java private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml"; private static final String FLINK_CONF_FILENAME = "config.yaml"; public static Configuration loadFlinkConfiguration(Path flinkHome) throws Exception { Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME); try { return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath); } catch (Exception e) { LOG.warn( "Failed to load the new configuration file:{}. Trying to load the old configuration file:{}.", FLINK_CONF_FILENAME, OLD_FLINK_CONF_FILENAME); return ConfigurationUtils.loadMapFormattedConfig( flinkHome.resolve(FLINK_CONF_DIR).resolve(OLD_FLINK_CONF_FILENAME)); } } ``` This is my idea for exception handling. We first attempt to load the new configuration file. If an exception occurs (which could be due to the absence of the new configuration file), we then try to load the old configuration file. This approach not only ensures backward compatibility and allows for a smoother transition between different versions of Flink, but also results in two log notifications from the `loadMapFormattedConfig` method. I think this will make the notifications more user-friendly. Do you think this approach is feasible? Or do you have any better suggestions or ideas? I look forward to your feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]
PatrickRen commented on code in PR #3194: URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1549116999 ## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java: ## @@ -20,17 +20,24 @@ import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import java.nio.file.Files; import java.nio.file.Path; import java.util.List; /** Utilities for handling Flink configuration and environment. */ public class FlinkEnvironmentUtils { private static final String FLINK_CONF_DIR = "conf"; -private static final String FLINK_CONF_FILENAME = "flink-conf.yaml"; +private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml"; +private static final String NEW_FLINK_CONF_FILENAME = "config.yaml"; public static Configuration loadFlinkConfiguration(Path flinkHome) throws Exception { -Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME); +Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(OLD_FLINK_CONF_FILENAME); +// If the old version of the configuration file does not exist, then attempt to use the new +// version of the file name. +if (!Files.exists(flinkConfPath)) { +flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(NEW_FLINK_CONF_FILENAME); +} return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath); Review Comment: As the new `config.yaml` is not map-formatted anymore, I think we need to rewrite the parsing logic here. `ConfigurationUtils.loadMapFormattedConfig` assumes all configs are in flatten key:value format. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org