C0urante commented on code in PR #12010: URL: https://github.com/apache/kafka/pull/12010#discussion_r852469399
########## clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java: ########## @@ -212,6 +213,7 @@ public class AdminClientConfig extends AbstractConfig { .define(SECURITY_PROTOCOL_CONFIG, Type.STRING, DEFAULT_SECURITY_PROTOCOL, + in(SecurityProtocol.names().toArray(new String[0])), Review Comment: Can we use [Utils::enumOptions](https://github.com/apache/kafka/blob/6d36487b684fd41522cccd4da4fd88f0b89ff0b7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1418-L1433) for this? ```suggestion in(Utils.enumOptions(SecurityProtocol.class)), ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -486,10 +488,14 @@ public class ConsumerConfig extends AbstractConfig { CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, + ConfigDef.NO_DEFAULT_VALUE, + new ConfigDef.NonNullValidator(), Review Comment: Isn't this redundant? Why do we need a `NonNullValidator` when we've explicitly marked this property as having no default value? ########## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java: ########## @@ -108,4 +112,42 @@ public void testDefaultPartitionAssignor() { assertEquals(Arrays.asList(RangeAssignor.class, CooperativeStickyAssignor.class), new ConsumerConfig(properties).getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)); } + + @Test + public void testInvalidKeyDeserializer() { Review Comment: Maybe also worth covering cases where the values in the `configs` map are `null`, but [this constructor](https://github.com/apache/kafka/blob/6d36487b684fd41522cccd4da4fd88f0b89ff0b7/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L663-L666) is used? ########## core/src/main/scala/kafka/server/KafkaConfig.scala: ########## @@ -1324,9 +1324,9 @@ object KafkaConfig { .define(SslEngineFactoryClassProp, CLASS, null, LOW, SslEngineFactoryClassDoc) /** ********* Sasl Configuration ****************/ - .define(SaslMechanismInterBrokerProtocolProp, STRING, Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, SaslMechanismInterBrokerProtocolDoc) + .define(SaslMechanismInterBrokerProtocolProp, STRING, Defaults.SaslMechanismInterBrokerProtocol, ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()), MEDIUM, SaslMechanismInterBrokerProtocolDoc) .define(SaslJaasConfigProp, PASSWORD, null, MEDIUM, SaslJaasConfigDoc) - .define(SaslEnabledMechanismsProp, LIST, Defaults.SaslEnabledMechanisms, MEDIUM, SaslEnabledMechanismsDoc) + .define(SaslEnabledMechanismsProp, LIST, Defaults.SaslEnabledMechanisms, new BrokerSecurityConfigs.SaslEnabledMechanismsValidator(), MEDIUM, SaslEnabledMechanismsDoc) Review Comment: I don't think we can add this change; it's possible to bring up a Kafka server right now with `sasl.enabled.mechanisms` set to `,,,` as long as SASL isn't enabled. ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java: ########## @@ -49,15 +53,15 @@ public void testClusterConfigProperties() { "clusters", "a, b", "a.bootstrap.servers", "servers-one", "b.bootstrap.servers", "servers-two", - "security.protocol", "SASL", + "security.protocol", "SSL", Review Comment: Is it possible today to bring up a MirrorMaker 2 instance with `security.protocol` set to `SASL` without it failing? If so, we might have to leave out this change (and the new validator for the `security.protocol` property) to avoid breaking compatibility. ########## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ########## @@ -202,7 +202,7 @@ public static void addClientSaslSupport(ConfigDef config) { .define(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_JITTER, Range.between(0.0, 0.25), ConfigDef.Importance.LOW, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC) .define(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, ConfigDef.Type.SHORT, SaslConfigs.DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS, Range.between(0, 900), ConfigDef.Importance.LOW, SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC) .define(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, ConfigDef.Type.SHORT, SaslConfigs.DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS, Range.between(0, 3600), ConfigDef.Importance.LOW, SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC) - .define(SaslConfigs.SASL_MECHANISM, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_MECHANISM_DOC) + .define(SaslConfigs.SASL_MECHANISM, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()), ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_MECHANISM_DOC) Review Comment: I don't think we can add this; both `null` and `""` are valid if SASL isn't enabled at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org