OmniaGM commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1541313630
########## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ########## @@ -1502,13 +1502,23 @@ public static <K, V> Map<K, V> filterMap(final Map<K, V> map, final Predicate<En * @return a map including all elements in properties */ public static Map<String, Object> propsToMap(Properties properties) { - Map<String, Object> map = new HashMap<>(properties.size()); - for (Map.Entry<Object, Object> entry : properties.entrySet()) { + return castToStringObjectMap(properties); + } + + /** + * Cast a map with arbitrary type keys to be keyed on String. + * @param inputMap A map with unknown type keys + * @return A map with the same contents as the input map, but with String keys + * @throws ConfigException if any key is not a String + */ + public static Map<String, Object> castToStringObjectMap(Map<?, ?> inputMap) { Review Comment: There is a similar function like this one in `AbstractConfigTest.convertPropertiesToMap` maybe we can drop the one in `AbstractConfigTest` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -925,10 +926,13 @@ static Map<String, Object> adminConfigs(String connName, // Ignore configs that begin with "admin." since those will be added next (with the prefix stripped) // and those that begin with "producer." and "consumer.", since we know they aren't intended for // the admin client + // Also ignore the config.providers configurations because the worker-configured ConfigProviders should + // already have been evaluated via the trusted WorkerConfig constructor Map<String, Object> nonPrefixedWorkerConfigs = config.originals().entrySet().stream() .filter(e -> !e.getKey().startsWith("admin.") && !e.getKey().startsWith("producer.") - && !e.getKey().startsWith("consumer.")) + && !e.getKey().startsWith("consumer.") + && !e.getKey().startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) Review Comment: Small suggestion, as the list of config isn't that huge then I would suggest refactoring this to something similar to the following ``` Stream<String> prefixes = Stream.of("admin.", "producer.", "consumer.", AbstractConfig.CONFIG_PROVIDERS_CONFIG); Map<String, Object> nonPrefixedWorkerConfigs = config.originals().entrySet().stream() .filter(e -> prefixes.allMatch(s -> !e.getKey().startsWith(s))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); ``` ########## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ########## @@ -580,8 +601,15 @@ private Map<String, ConfigProvider> instantiateConfigProviders(Map<String, Strin for (String provider : configProviders.split(",")) { String providerClass = providerClassProperty(provider); - if (indirectConfigs.containsKey(providerClass)) - providerMap.put(provider, indirectConfigs.get(providerClass)); + if (indirectConfigs.containsKey(providerClass)) { + String providerClassName = indirectConfigs.get(providerClass); + if (classNameFilter.test(providerClassName)) { + providerMap.put(provider, providerClassName); + } else { + throw new ConfigException(providerClassName + " is not allowed. Update System property '" + + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to allow " + providerClassName); + } + } Review Comment: There is an extra line here need to be removed ########## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ########## @@ -580,8 +601,15 @@ private Map<String, ConfigProvider> instantiateConfigProviders(Map<String, Strin for (String provider : configProviders.split(",")) { String providerClass = providerClassProperty(provider); - if (indirectConfigs.containsKey(providerClass)) - providerMap.put(provider, indirectConfigs.get(providerClass)); + if (indirectConfigs.containsKey(providerClass)) { + String providerClassName = indirectConfigs.get(providerClass); + if (classNameFilter.test(providerClassName)) { Review Comment: small suggestion here instead of having 2 nested `if`s inside a `for` maybe we can something like this ``` Optional<String> providerClassName = Optional.ofNullable(indirectConfigs.get(providerClass)); Boolean isAllowed = providerClassName.map(name -> classNameFilter.test(name)).orElse(false); if (isAllowed) { providerMap.put(provider, providerClassName.get()); } else { throw new ConfigException(providerClassName + " is not allowed. Update System property '" + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to allow " + providerClassName); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org