divijvaidya commented on code in PR #12010: URL: https://github.com/apache/kafka/pull/12010#discussion_r853124702
########## clients/src/main/java/org/apache/kafka/common/config/SslClientAuth.java: ########## @@ -45,4 +45,9 @@ public static SslClientAuth forConfig(String key) { } return null; } + + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); Review Comment: optional `super.toString().toLowerCase(Locale.ROOT);` ? ########## clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java: ########## @@ -89,4 +91,31 @@ public class BrokerSecurityConfigs { + "The broker will disconnect any such connection that is not re-authenticated within the session lifetime and that is then subsequently " + "used for any purpose other than re-authentication. Configuration names can optionally be prefixed with listener prefix and SASL " + "mechanism name in lower-case. For example, listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000"; + + public static class SaslEnabledMechanismsValidator implements ConfigDef.Validator { Review Comment: please add javadoc for all public classes and methods ########## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java: ########## @@ -108,4 +112,42 @@ public void testDefaultPartitionAssignor() { assertEquals(Arrays.asList(RangeAssignor.class, CooperativeStickyAssignor.class), new ConsumerConfig(properties).getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)); } + + @Test + public void testInvalidKeyDeserializer() { + Map<String, Object> configs = new HashMap<>(); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, null); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); + ConfigException ce = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs)); + assertTrue(ce.getMessage().contains(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); + } + + @Test Review Comment: optional suggestion Create a parameterized test [1] with multiple invalid values such as empty string, null, string with whitespace in it. Same could be done for rest of the tests as well. [1] https://junit.org/junit5/docs/current/user-guide/#writing-tests-parameterized-tests ########## clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java: ########## @@ -89,4 +91,31 @@ public class BrokerSecurityConfigs { + "The broker will disconnect any such connection that is not re-authenticated within the session lifetime and that is then subsequently " + "used for any purpose other than re-authentication. Configuration names can optionally be prefixed with listener prefix and SASL " + "mechanism name in lower-case. For example, listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000"; + + public static class SaslEnabledMechanismsValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + if (value == null) { + throw new ConfigException(name, null, "entry must be non null"); + } + + @SuppressWarnings("unchecked") + List<String> mechanismStrings = (List) value; + + if (mechanismStrings.isEmpty()) { + throw new ConfigException(name, null, "entry must be non-empty list"); + } + + mechanismStrings.forEach(mechanism -> { + if (mechanism == null || mechanism.isEmpty()) { Review Comment: A stricter check could be `String.isBlank()` which checks for white spaces as well which is invalid in this scenario. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java: ########## @@ -279,7 +281,7 @@ protected static ConfigDef baseConfigDef() { "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, - ConfigDef.Type.STRING, "none", ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC) + ConfigDef.Type.STRING, SslClientAuth.NONE.name().toLowerCase(Locale.ROOT), in(Utils.enumOptions(SslClientAuth.class)), ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC) Review Comment: since we have overridden the toString method in SslClientAuth, we can safely do SslClientAuth.NONE.toString() here. ########## clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java: ########## @@ -89,4 +91,31 @@ public class BrokerSecurityConfigs { + "The broker will disconnect any such connection that is not re-authenticated within the session lifetime and that is then subsequently " + "used for any purpose other than re-authentication. Configuration names can optionally be prefixed with listener prefix and SASL " + "mechanism name in lower-case. For example, listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000"; + + public static class SaslEnabledMechanismsValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + if (value == null) { + throw new ConfigException(name, null, "entry must be non null"); + } + + @SuppressWarnings("unchecked") + List<String> mechanismStrings = (List) value; + + if (mechanismStrings.isEmpty()) { + throw new ConfigException(name, null, "entry must be non-empty list"); + } + + mechanismStrings.forEach(mechanism -> { + if (mechanism == null || mechanism.isEmpty()) { + throw new ConfigException(name, mechanism, "enabled mechanism must be non-null or non-empty string"); Review Comment: nit "SASL enabled mechanism must be.... ########## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ########## @@ -393,10 +394,14 @@ public class ProducerConfig extends AbstractConfig { MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, + ConfigDef.NO_DEFAULT_VALUE, + new ConfigDef.NonNullValidator(), Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, + ConfigDef.NO_DEFAULT_VALUE, + new ConfigDef.NonNullValidator(), Review Comment: An empty string or a string with blank space will also be invalid for a value serializer. Correct? In that case this could be a composition of NonNull and NotEmpty validator. Same for the key serializer. ########## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ########## @@ -393,10 +394,14 @@ public class ProducerConfig extends AbstractConfig { MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, + ConfigDef.NO_DEFAULT_VALUE, + new ConfigDef.NonNullValidator(), Review Comment: From a code perspective, it is possible to initialize the config with null serializer and add it later via `appendSerializerToConfig` https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java#L539 We are changing the contract to assert that serializer can not be null at any time in the config. I am not aware of the repercussions of changing this contract but if we are asserting this contract then the lines of code in `appendSerializerToConfig` should change as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org