divijvaidya commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853124702


##########
clients/src/main/java/org/apache/kafka/common/config/SslClientAuth.java:
##########
@@ -45,4 +45,9 @@ public static SslClientAuth forConfig(String key) {
         }
         return null;
     }
+
+    @Override
+    public String toString() {
+        return name().toLowerCase(Locale.ROOT);

Review Comment:
   optional
   
   `super.toString().toLowerCase(Locale.ROOT);` ?



##########
clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java:
##########
@@ -89,4 +91,31 @@ public class BrokerSecurityConfigs {
             + "The broker will disconnect any such connection that is not 
re-authenticated within the session lifetime and that is then subsequently "
             + "used for any purpose other than re-authentication. 
Configuration names can optionally be prefixed with listener prefix and SASL "
             + "mechanism name in lower-case. For example, 
listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000";
+
+    public static class SaslEnabledMechanismsValidator implements 
ConfigDef.Validator {

Review Comment:
   please add javadoc for all public classes and methods



##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##########
@@ -108,4 +112,42 @@ public void testDefaultPartitionAssignor() {
         assertEquals(Arrays.asList(RangeAssignor.class, 
CooperativeStickyAssignor.class),
             new 
ConsumerConfig(properties).getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
     }
+
+    @Test
+    public void testInvalidKeyDeserializer() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, null);
+        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+        ConfigException ce = assertThrows(ConfigException.class, () -> new 
ConsumerConfig(configs));
+        
assertTrue(ce.getMessage().contains(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+    }
+
+    @Test

Review Comment:
   optional suggestion
   
   Create a parameterized test [1] with multiple invalid values such as empty 
string, null, string with whitespace in it. Same could be done for rest of the 
tests as well.
   
   [1] 
https://junit.org/junit5/docs/current/user-guide/#writing-tests-parameterized-tests



##########
clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java:
##########
@@ -89,4 +91,31 @@ public class BrokerSecurityConfigs {
             + "The broker will disconnect any such connection that is not 
re-authenticated within the session lifetime and that is then subsequently "
             + "used for any purpose other than re-authentication. 
Configuration names can optionally be prefixed with listener prefix and SASL "
             + "mechanism name in lower-case. For example, 
listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000";
+
+    public static class SaslEnabledMechanismsValidator implements 
ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            if (value == null) {
+                throw new ConfigException(name, null, "entry must be non 
null");
+            }
+
+            @SuppressWarnings("unchecked")
+            List<String> mechanismStrings = (List) value;
+
+            if (mechanismStrings.isEmpty()) {
+                throw new ConfigException(name, null, "entry must be non-empty 
list");
+            }
+
+            mechanismStrings.forEach(mechanism -> {
+                if (mechanism == null || mechanism.isEmpty()) {

Review Comment:
   A stricter check could be `String.isBlank()` which checks for white spaces 
as well which is invalid in this scenario.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##########
@@ -279,7 +281,7 @@ protected static ConfigDef baseConfigDef() {
                         "", Importance.LOW,
                         CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
                 .define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
-                        ConfigDef.Type.STRING, "none", 
ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)
+                        ConfigDef.Type.STRING, 
SslClientAuth.NONE.name().toLowerCase(Locale.ROOT), 
in(Utils.enumOptions(SslClientAuth.class)), ConfigDef.Importance.LOW, 
BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)

Review Comment:
   since we have overridden the toString method in SslClientAuth, we can safely 
do SslClientAuth.NONE.toString() here.



##########
clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java:
##########
@@ -89,4 +91,31 @@ public class BrokerSecurityConfigs {
             + "The broker will disconnect any such connection that is not 
re-authenticated within the session lifetime and that is then subsequently "
             + "used for any purpose other than re-authentication. 
Configuration names can optionally be prefixed with listener prefix and SASL "
             + "mechanism name in lower-case. For example, 
listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000";
+
+    public static class SaslEnabledMechanismsValidator implements 
ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            if (value == null) {
+                throw new ConfigException(name, null, "entry must be non 
null");
+            }
+
+            @SuppressWarnings("unchecked")
+            List<String> mechanismStrings = (List) value;
+
+            if (mechanismStrings.isEmpty()) {
+                throw new ConfigException(name, null, "entry must be non-empty 
list");
+            }
+
+            mechanismStrings.forEach(mechanism -> {
+                if (mechanism == null || mechanism.isEmpty()) {
+                    throw new ConfigException(name, mechanism, "enabled 
mechanism must be non-null or non-empty string");

Review Comment:
   nit
   
   "SASL enabled mechanism must be....



##########
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##########
@@ -393,10 +394,14 @@ public class ProducerConfig extends AbstractConfig {
                                         
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
                                 .define(KEY_SERIALIZER_CLASS_CONFIG,
                                         Type.CLASS,
+                                        ConfigDef.NO_DEFAULT_VALUE,
+                                        new ConfigDef.NonNullValidator(),
                                         Importance.HIGH,
                                         KEY_SERIALIZER_CLASS_DOC)
                                 .define(VALUE_SERIALIZER_CLASS_CONFIG,
                                         Type.CLASS,
+                                        ConfigDef.NO_DEFAULT_VALUE,
+                                        new ConfigDef.NonNullValidator(),

Review Comment:
   An empty string or a string with blank space will also be invalid for a 
value serializer. Correct?
   
   In that case this could be a composition of NonNull and NotEmpty validator. 
Same for the key serializer.



##########
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##########
@@ -393,10 +394,14 @@ public class ProducerConfig extends AbstractConfig {
                                         
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
                                 .define(KEY_SERIALIZER_CLASS_CONFIG,
                                         Type.CLASS,
+                                        ConfigDef.NO_DEFAULT_VALUE,
+                                        new ConfigDef.NonNullValidator(),

Review Comment:
   From a code perspective, it is possible to initialize the config with null 
serializer and add it later via `appendSerializerToConfig` 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java#L539
 
   
   We are changing the contract to assert that serializer can not be null at 
any time in the config. I am not aware of the repercussions of changing this 
contract but if we are asserting this contract then the lines of code in 
`appendSerializerToConfig` should change as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to