lianetm commented on code in PR #16899: URL: https://github.com/apache/kafka/pull/16899#discussion_r1852903322
########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -379,7 +378,29 @@ public class ConsumerConfig extends AbstractConfig { private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC; private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); + private static final List<Class<? extends AbstractPartitionAssignor>> PARTITION_ASSIGNOR_DEFAULT_VALUE = + List.of(RangeAssignor.class, CooperativeStickyAssignor.class); + /** + * A list of configuration keys for CLASSIC protocol not supported. we should check the input string and clean up the Review Comment: ```suggestion * A list of configuration keys not supported for CLASSIC protocol. ``` I'm also suggesting we remove the second sentence about "clean up default values" because I cannot find that we clean up values for unsupported configs, I only see we throw ConfigException. Am I missing something? ########## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java: ########## @@ -237,4 +237,22 @@ public void testProtocolConfigValidation(String protocol, boolean isValid) { assertThrows(ConfigException.class, () -> new ConsumerConfig(configs)); } } + + @Test + public void testUnsupportedConfigsWithConsumerGroupProtocol() { + testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "RoundRobinAssignor"); + testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); + testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); + } + + private void testUnsupportedConfigsWithConsumerGroupProtocol(String configName, Object value) { + final Map<String, Object> configs = new HashMap<>(); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); + configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()); + configs.put(configName, value); + ConfigException exception = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs)); + assertTrue(exception.getMessage().contains(configName + Review Comment: can't we `assertEquals` here ? ########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -718,9 +740,14 @@ private void maybeOverrideEnableAutoCommit(Map<String, Object> configs) { } } - private void checkGroupRemoteAssignor() { - if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CLASSIC.name()) && getString(GROUP_REMOTE_ASSIGNOR_CONFIG) != null && !getString(GROUP_REMOTE_ASSIGNOR_CONFIG).isEmpty()) { - throw new ConfigException(GROUP_REMOTE_ASSIGNOR_CONFIG + " cannot be set when " + GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CLASSIC.name()); + private void checkUnsupportedConfigs(GroupProtocol groupProtocol, List<String> unsupportedConfigs) { + if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(groupProtocol.name())) { + unsupportedConfigs.forEach(configName -> { + Object config = originals().get(configName); + if (config != null && !Utils.isBlank(config.toString())) { + throw new ConfigException(configName + " cannot be set when " + GROUP_PROTOCOL_CONFIG + "=" + groupProtocol.name()); Review Comment: would it be a better experience for the user if we gather all the unsupported configs used first, and then throw an error with all of them? I expect it will be helpful when users upgrade to the new protocol, and probably leave the existing config (with several unsupported). ########## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java: ########## @@ -237,4 +237,22 @@ public void testProtocolConfigValidation(String protocol, boolean isValid) { assertThrows(ConfigException.class, () -> new ConsumerConfig(configs)); } } + + @Test + public void testUnsupportedConfigsWithConsumerGroupProtocol() { + testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "RoundRobinAssignor"); + testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); + testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); + } + + private void testUnsupportedConfigsWithConsumerGroupProtocol(String configName, Object value) { + final Map<String, Object> configs = new HashMap<>(); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); + configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()); + configs.put(configName, value); Review Comment: Map.of? ########## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ########## @@ -337,7 +335,6 @@ private Map<String, Object> composeConfigs(ClusterInstance cluster, String group configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol); - configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); Review Comment: just to double check, even though we're removing this prop, I expect the test will still use the range assignor if under classic protocol just because range is the first assignor in the default value of the config. Correct? ########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -413,7 +434,7 @@ public class ConsumerConfig extends AbstractConfig { HEARTBEAT_INTERVAL_MS_DOC) .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Type.LIST, - Arrays.asList(RangeAssignor.class, CooperativeStickyAssignor.class), + PARTITION_ASSIGNOR_DEFAULT_VALUE, Review Comment: This constant was introduced to be reused and that made sense, but seems it's only used here now right? If so I think keeping the default explicitly here as it was is more convenient, it just makes it easier to discover (I personally navigate to this definition a lot looking for defaults :)) ########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -379,7 +378,29 @@ public class ConsumerConfig extends AbstractConfig { private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC; private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); + private static final List<Class<? extends AbstractPartitionAssignor>> PARTITION_ASSIGNOR_DEFAULT_VALUE = + List.of(RangeAssignor.class, CooperativeStickyAssignor.class); + /** + * A list of configuration keys for CLASSIC protocol not supported. we should check the input string and clean up the + * default value. + */ + private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = Collections.singletonList( + GROUP_REMOTE_ASSIGNOR_CONFIG + ); + + /** + * A list of configuration keys for consumer protocol not supported. we should check the input string and clean up the + * default value. + */ + private static final List<String> CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS = List.of( + PARTITION_ASSIGNMENT_STRATEGY_CONFIG, + HEARTBEAT_INTERVAL_MS_CONFIG, + SESSION_TIMEOUT_MS_CONFIG, + "group.max.session.timeout.ms", + "group.mix.session.timeout.ms" Review Comment: well these configs are used by the broker only, not the client, so I would say we don't deal with them here and let the broker drive it? ########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -379,7 +378,29 @@ public class ConsumerConfig extends AbstractConfig { private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC; private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); + private static final List<Class<? extends AbstractPartitionAssignor>> PARTITION_ASSIGNOR_DEFAULT_VALUE = + List.of(RangeAssignor.class, CooperativeStickyAssignor.class); + /** + * A list of configuration keys for CLASSIC protocol not supported. we should check the input string and clean up the + * default value. + */ + private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = Collections.singletonList( + GROUP_REMOTE_ASSIGNOR_CONFIG + ); + + /** + * A list of configuration keys for consumer protocol not supported. we should check the input string and clean up the Review Comment: ```suggestion * A list of configuration keys not supported for CONSUMER protocol. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org