Re: [PR] MINOR: AbstractConfig cleanup [kafka]
gharris1727 merged PR #15597: URL: https://github.com/apache/kafka/pull/15597 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
gharris1727 commented on PR #15597: URL: https://github.com/apache/kafka/pull/15597#issuecomment-2026050399 Test failures appear unrelated: 1. ReplicaManagerTest has tickets [KAFKA-16323](https://issues.apache.org/jira/browse/KAFKA-16323) and [KAFKA-13530](https://issues.apache.org/jira/browse/KAFKA-13530) 2. ControllerRegistrationManagerTest has ticket [KAFKA-15897](https://issues.apache.org/jira/browse/KAFKA-15897) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
mimaison commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1543174303 ## core/src/main/scala/kafka/controller/PartitionStateMachine.scala: ## @@ -486,7 +486,7 @@ class ZkPartitionStateMachine(config: KafkaConfig, } else { val (logConfigs, failed) = zkClient.getLogConfigs( partitionsWithNoLiveInSyncReplicas.iterator.map { case (partition, _) => partition.topic }.toSet, -config.originals() +config.extractLogConfigMap Review Comment: It looks like this breaks `testOfflinePartitionToOnlinePartitionTransition()` ``` Wanted but not invoked: controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers( List(5), t-0, (Leader:5,ISR:5,LeaderRecoveryState:RECOVERED,LeaderEpoch:1,ZkVersion:2,ControllerEpoch:50), ReplicaAssignment(replicas=5, addingReplicas=, removingReplicas=), false ); -> at kafka.controller.AbstractControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(ControllerChannelManager.scala:421) However, there was exactly 1 interaction with this mock: controllerBrokerRequestBatch.newBatch(); -> at kafka.controller.ZkPartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:158) at kafka.controller.AbstractControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(ControllerChannelManager.scala:421) at kafka.controller.PartitionStateMachineTest.testOfflinePartitionToOnlinePartitionTransition(PartitionStateMachineTest.scala:275) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
gharris1727 commented on PR #15597: URL: https://github.com/apache/kafka/pull/15597#issuecomment-2023838245 Hi @mimaison @C0urante PTAL at the latest WorkerConfig change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
gharris1727 commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1541420383 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -580,8 +601,15 @@ private Map instantiateConfigProviders(Map
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
OmniaGM commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1541328444 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -925,10 +926,13 @@ static Map adminConfigs(String connName, // Ignore configs that begin with "admin." since those will be added next (with the prefix stripped) // and those that begin with "producer." and "consumer.", since we know they aren't intended for // the admin client +// Also ignore the config.providers configurations because the worker-configured ConfigProviders should +// already have been evaluated via the trusted WorkerConfig constructor Map nonPrefixedWorkerConfigs = config.originals().entrySet().stream() .filter(e -> !e.getKey().startsWith("admin.") && !e.getKey().startsWith("producer.") -&& !e.getKey().startsWith("consumer.")) +&& !e.getKey().startsWith("consumer.") +&& !e.getKey().startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) Review Comment: Small suggestion, as the list of config isn't that huge then I would suggest refactoring this to something similar to the following ``` Stream prefixes = Stream.of("admin.", "producer.", "consumer.", AbstractConfig.CONFIG_PROVIDERS_CONFIG); Map nonPrefixedWorkerConfigs = config.originals().entrySet().stream() .filter(e -> prefixes.allMatch(s -> !e.getKey().startsWith(s))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
OmniaGM commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1541293009 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -580,8 +601,15 @@ private Map instantiateConfigProviders(Map providerClassName = Optional.ofNullable(indirectConfigs.get(providerClass)); Boolean isAllowed = providerClassName.map(name -> classNameFilter.test(name)).orElse(false); if (isAllowed) { providerMap.put(provider, providerClassName.get()); } else { throw new ConfigException(providerClassName + " is not allowed. Update System property '" + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to allow " + providerClassName); } ``` ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -580,8 +601,15 @@ private Map instantiateConfigProviders(Map providerClassName = Optional.ofNullable(indirectConfigs.get(providerClass)); Boolean isAllowed = providerClassName.map(name -> classNameFilter.test(name)).orElse(false); if (isAllowed) { providerMap.put(provider, providerClassName.get()); } else { throw new ConfigException(providerClassName + " is not allowed. Update System property '" + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to allow " + providerClassName); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
OmniaGM commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1541313630 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1502,13 +1502,23 @@ public static Map filterMap(final Map map, final Predicate propsToMap(Properties properties) { -Map map = new HashMap<>(properties.size()); -for (Map.Entry entry : properties.entrySet()) { +return castToStringObjectMap(properties); +} + +/** + * Cast a map with arbitrary type keys to be keyed on String. + * @param inputMap A map with unknown type keys + * @return A map with the same contents as the input map, but with String keys + * @throws ConfigException if any key is not a String + */ +public static Map castToStringObjectMap(Map inputMap) { Review Comment: There is a similar function like this one in `AbstractConfigTest.convertPropertiesToMap` maybe we can drop the one in `AbstractConfigTest` ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -925,10 +926,13 @@ static Map adminConfigs(String connName, // Ignore configs that begin with "admin." since those will be added next (with the prefix stripped) // and those that begin with "producer." and "consumer.", since we know they aren't intended for // the admin client +// Also ignore the config.providers configurations because the worker-configured ConfigProviders should +// already have been evaluated via the trusted WorkerConfig constructor Map nonPrefixedWorkerConfigs = config.originals().entrySet().stream() .filter(e -> !e.getKey().startsWith("admin.") && !e.getKey().startsWith("producer.") -&& !e.getKey().startsWith("consumer.")) +&& !e.getKey().startsWith("consumer.") +&& !e.getKey().startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) Review Comment: Small suggestion, as the list of config isn't that huge then I would suggest refactoring this to something similar to the following ``` Stream prefixes = Stream.of("admin.", "producer.", "consumer.", AbstractConfig.CONFIG_PROVIDERS_CONFIG); Map nonPrefixedWorkerConfigs = config.originals().entrySet().stream() .filter(e -> prefixes.allMatch(s -> !e.getKey().startsWith(s))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); ``` ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -580,8 +601,15 @@ private Map instantiateConfigProviders(Map instantiateConfigProviders(Map providerClassName = Optional.ofNullable(indirectConfigs.get(providerClass)); Boolean isAllowed = providerClassName.map(name -> classNameFilter.test(name)).orElse(false); if (isAllowed) { providerMap.put(provider, providerClassName.get()); } else { throw new ConfigException(providerClassName + " is not allowed. Update System property '" + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to allow " + providerClassName); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
mjsax commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1541286490 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -58,6 +61,8 @@ public class AbstractConfig { private final ConfigDef definition; +public static final String AUTOMATIC_CONFIG_PROVIDERS_PROPERTY = "org.apache.kafka.automatic.config.providers"; Review Comment: Looks like a public API change to me. Don't we need a KIP? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
C0urante commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1540232126 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -547,6 +552,17 @@ private Map extractPotentialVariables(Map configMap) { return new ResolvingMap<>(resolvedOriginals, originals); } +private Predicate automaticConfigProvidersFilter() { +String systemProperty = System.getProperty(AUTOMATIC_CONFIG_PROVIDERS_PROPERTY); +if (systemProperty == null) { +return ignored -> true; +} else { +return Arrays.stream(systemProperty.split(",")) Review Comment: 臘♂️ This is what I get for reviewing on mobile. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
gharris1727 commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1540169589 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -547,6 +552,17 @@ private Map extractPotentialVariables(Map configMap) { return new ResolvingMap<>(resolvedOriginals, originals); } +private Predicate automaticConfigProvidersFilter() { +String systemProperty = System.getProperty(AUTOMATIC_CONFIG_PROVIDERS_PROPERTY); +if (systemProperty == null) { +return ignored -> true; +} else { +return Arrays.stream(systemProperty.split(",")) Review Comment: Also split(",") -> trim is more permissive than split(" *, *") or similar, because it removes whitespace at the beginning of the first element and end of the last element. Trim cannot be eliminated in all cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
gharris1727 commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1540167974 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -547,6 +552,17 @@ private Map extractPotentialVariables(Map configMap) { return new ResolvingMap<>(resolvedOriginals, originals); } +private Predicate automaticConfigProvidersFilter() { +String systemProperty = System.getProperty(AUTOMATIC_CONFIG_PROVIDERS_PROPERTY); +if (systemProperty == null) { +return ignored -> true; +} else { +return Arrays.stream(systemProperty.split(",")) Review Comment: The next line trims the whitespace from the strings, so we should already be permissive. I see that ConfigDef uses `"\\s*,\\s*"` for List properties, while AbstractConfig uses `","` _without_ trim for `config.providers`. That's not great, but that's also not affected by this PR. Since `ConfigDef.COMMA_WITH_WHITESPACE` is private, I think i'll leave this as-is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
C0urante commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1540093517 ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -461,6 +497,25 @@ public void testAutoConfigResolutionWithInvalidConfigProviderClass() { } } +@Test +public void testAutoConfigResolutionWithInvalidConfigProviderClassExcluded() { +System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, ""); +// Test Case: Invalid class for Config Provider +Properties props = new Properties(); +props.put("config.providers", "file"); +props.put("config.providers.file.class", + "org.apache.kafka.common.config.provider.InvalidConfigProvider"); +props.put("testKey", "${test:/foo/bar/testpath:testKey}"); +try { +new TestIndirectConfigResolution(props, Collections.emptyMap()); +fail("Expected a config exception due to invalid props :" + props); +} catch (KafkaException e) { +// deliver the disallowed message first to prevent probing the classloader + assertTrue(e.getMessage().contains(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY)); +// this is good +} Review Comment: Nit: can't we use `assertThrows`? The return value can be used to perform assertions on error messages. ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -547,6 +552,17 @@ private Map extractPotentialVariables(Map configMap) { return new ResolvingMap<>(resolvedOriginals, originals); } +private Predicate automaticConfigProvidersFilter() { +String systemProperty = System.getProperty(AUTOMATIC_CONFIG_PROVIDERS_PROPERTY); +if (systemProperty == null) { +return ignored -> true; +} else { +return Arrays.stream(systemProperty.split(",")) Review Comment: Nit: can/should we be more permissive in the parsing logic, to align with how list types are parsed by the AbstractConfig class? ```suggestion return Arrays.stream(systemProperty.split(" *, *")) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
gharris1727 commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1540111732 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -58,6 +61,8 @@ public class AbstractConfig { private final ConfigDef definition; +public static final String AUTOMATIC_CONFIG_PROVIDERS_PROPERTY = "org.apache.kafka.automatic.config.providers"; Review Comment: I wanted to distinguish the fact that this only affects the "automatic resolution", not every use of the config providers, which is how I would interpret your suggested name. The "automatic" comes from the language of the original KIP, though i'm not sure how many people call this "automatic resolution" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
mimaison commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1539776305 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -58,6 +61,8 @@ public class AbstractConfig { private final ConfigDef definition; +public static final String AUTOMATIC_CONFIG_PROVIDERS_PROPERTY = "org.apache.kafka.automatic.config.providers"; Review Comment: I'm not sure on the naming. What about `org.apache.kafka.allowed.config.providers`? ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -461,6 +497,25 @@ public void testAutoConfigResolutionWithInvalidConfigProviderClass() { } } +@Test +public void testAutoConfigResolutionWithInvalidConfigProviderClassExcluded() { +System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, ""); Review Comment: Should we also have a test with the property containing a list of providers? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org