C0urante commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r852469399


##########
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java:
##########
@@ -212,6 +213,7 @@ public class AdminClientConfig extends AbstractConfig {
                                 .define(SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
                                         DEFAULT_SECURITY_PROTOCOL,
+                                        
in(SecurityProtocol.names().toArray(new String[0])),

Review Comment:
   Can we use 
[Utils::enumOptions](https://github.com/apache/kafka/blob/6d36487b684fd41522cccd4da4fd88f0b89ff0b7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1418-L1433)
 for this?
   ```suggestion
                                           
in(Utils.enumOptions(SecurityProtocol.class)),
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -486,10 +488,14 @@ public class ConsumerConfig extends AbstractConfig {
                                         
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
                                 .define(KEY_DESERIALIZER_CLASS_CONFIG,
                                         Type.CLASS,
+                                        ConfigDef.NO_DEFAULT_VALUE,
+                                        new ConfigDef.NonNullValidator(),

Review Comment:
   Isn't this redundant? Why do we need a `NonNullValidator` when we've 
explicitly marked this property as having no default value?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##########
@@ -108,4 +112,42 @@ public void testDefaultPartitionAssignor() {
         assertEquals(Arrays.asList(RangeAssignor.class, 
CooperativeStickyAssignor.class),
             new 
ConsumerConfig(properties).getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
     }
+
+    @Test
+    public void testInvalidKeyDeserializer() {

Review Comment:
   Maybe also worth covering cases where the values in the `configs` map are 
`null`, but [this 
constructor](https://github.com/apache/kafka/blob/6d36487b684fd41522cccd4da4fd88f0b89ff0b7/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L663-L666)
 is used?



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -1324,9 +1324,9 @@ object KafkaConfig {
       .define(SslEngineFactoryClassProp, CLASS, null, LOW, 
SslEngineFactoryClassDoc)
 
       /** ********* Sasl Configuration ****************/
-      .define(SaslMechanismInterBrokerProtocolProp, STRING, 
Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, 
SaslMechanismInterBrokerProtocolDoc)
+      .define(SaslMechanismInterBrokerProtocolProp, STRING, 
Defaults.SaslMechanismInterBrokerProtocol, ConfigDef.CompositeValidator.of(new 
ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()), MEDIUM, 
SaslMechanismInterBrokerProtocolDoc)
       .define(SaslJaasConfigProp, PASSWORD, null, MEDIUM, SaslJaasConfigDoc)
-      .define(SaslEnabledMechanismsProp, LIST, Defaults.SaslEnabledMechanisms, 
MEDIUM, SaslEnabledMechanismsDoc)
+      .define(SaslEnabledMechanismsProp, LIST, Defaults.SaslEnabledMechanisms, 
new BrokerSecurityConfigs.SaslEnabledMechanismsValidator(), MEDIUM, 
SaslEnabledMechanismsDoc)

Review Comment:
   I don't think we can add this change; it's possible to bring up a Kafka 
server right now with `sasl.enabled.mechanisms` set to `,,,` as long as SASL 
isn't enabled.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java:
##########
@@ -49,15 +53,15 @@ public void testClusterConfigProperties() {
             "clusters", "a, b",
             "a.bootstrap.servers", "servers-one",
             "b.bootstrap.servers", "servers-two",
-            "security.protocol", "SASL",
+            "security.protocol", "SSL",

Review Comment:
   Is it possible today to bring up a MirrorMaker 2 instance with 
`security.protocol` set to `SASL` without it failing? If so, we might have to 
leave out this change (and the new validator for the `security.protocol` 
property) to avoid breaking compatibility.



##########
clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java:
##########
@@ -202,7 +202,7 @@ public static void addClientSaslSupport(ConfigDef config) {
                 .define(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER, 
ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_JITTER, 
Range.between(0.0, 0.25), ConfigDef.Importance.LOW, 
SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC)
                 .define(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, 
ConfigDef.Type.SHORT, SaslConfigs.DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS, 
Range.between(0, 900), ConfigDef.Importance.LOW, 
SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC)
                 .define(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, 
ConfigDef.Type.SHORT, SaslConfigs.DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS, 
Range.between(0, 3600), ConfigDef.Importance.LOW, 
SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC)
-                .define(SaslConfigs.SASL_MECHANISM, ConfigDef.Type.STRING, 
SaslConfigs.DEFAULT_SASL_MECHANISM, ConfigDef.Importance.MEDIUM, 
SaslConfigs.SASL_MECHANISM_DOC)
+                .define(SaslConfigs.SASL_MECHANISM, ConfigDef.Type.STRING, 
SaslConfigs.DEFAULT_SASL_MECHANISM, ConfigDef.CompositeValidator.of(new 
ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()), 
ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_MECHANISM_DOC)

Review Comment:
   I don't think we can add this; both `null` and `""` are valid if SASL isn't 
enabled at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to