lianetm commented on code in PR #16899:
URL: https://github.com/apache/kafka/pull/16899#discussion_r1852903322


##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -379,7 +378,29 @@ public class ConsumerConfig extends AbstractConfig {
     private static final String SECURITY_PROVIDERS_DOC = 
SecurityConfig.SECURITY_PROVIDERS_DOC;
 
     private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
+    private static final List<Class<? extends AbstractPartitionAssignor>> 
PARTITION_ASSIGNOR_DEFAULT_VALUE =
+            List.of(RangeAssignor.class, CooperativeStickyAssignor.class);
 
+    /**
+     * A list of configuration keys for CLASSIC protocol not supported. we 
should check the input string and clean up the

Review Comment:
   ```suggestion
        * A list of configuration keys not supported for CLASSIC protocol.
   ```
   I'm also suggesting we remove the second sentence about "clean up default 
values" because I cannot find that we clean up values for unsupported configs, 
I only see we throw ConfigException. Am I missing something?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##########
@@ -237,4 +237,22 @@ public void testProtocolConfigValidation(String protocol, 
boolean isValid) {
             assertThrows(ConfigException.class, () -> new 
ConsumerConfig(configs));
         }
     }
+
+    @Test
+    public void testUnsupportedConfigsWithConsumerGroupProtocol() {
+        
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
 "RoundRobinAssignor");
+        
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
 1000);
+        
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
 30000);
+    }
+
+    private void testUnsupportedConfigsWithConsumerGroupProtocol(String 
configName, Object value) {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+        configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name());
+        configs.put(configName, value);
+        ConfigException exception = assertThrows(ConfigException.class, () -> 
new ConsumerConfig(configs));
+        assertTrue(exception.getMessage().contains(configName + 

Review Comment:
   can't we `assertEquals` here ?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -718,9 +740,14 @@ private void maybeOverrideEnableAutoCommit(Map<String, 
Object> configs) {
         }
     }
 
-    private void checkGroupRemoteAssignor() {
-        if 
(getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CLASSIC.name())
 && getString(GROUP_REMOTE_ASSIGNOR_CONFIG) != null && 
!getString(GROUP_REMOTE_ASSIGNOR_CONFIG).isEmpty()) {
-            throw new ConfigException(GROUP_REMOTE_ASSIGNOR_CONFIG + " cannot 
be set when " + GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CLASSIC.name());
+    private void checkUnsupportedConfigs(GroupProtocol groupProtocol, 
List<String> unsupportedConfigs) {
+        if 
(getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(groupProtocol.name())) {
+            unsupportedConfigs.forEach(configName -> {
+                Object config = originals().get(configName);
+                if (config != null && !Utils.isBlank(config.toString())) {
+                    throw new ConfigException(configName + " cannot be set 
when " + GROUP_PROTOCOL_CONFIG + "=" + groupProtocol.name());

Review Comment:
   would it be a better experience for the user if we gather all the 
unsupported configs used first, and then throw an error with all of them? I 
expect it will be helpful when users upgrade to the new protocol, and probably 
leave the existing config (with several unsupported). 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##########
@@ -237,4 +237,22 @@ public void testProtocolConfigValidation(String protocol, 
boolean isValid) {
             assertThrows(ConfigException.class, () -> new 
ConsumerConfig(configs));
         }
     }
+
+    @Test
+    public void testUnsupportedConfigsWithConsumerGroupProtocol() {
+        
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
 "RoundRobinAssignor");
+        
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
 1000);
+        
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
 30000);
+    }
+
+    private void testUnsupportedConfigsWithConsumerGroupProtocol(String 
configName, Object value) {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+        configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name());
+        configs.put(configName, value);

Review Comment:
   Map.of?



##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##########
@@ -337,7 +335,6 @@ private Map<String, Object> composeConfigs(ClusterInstance 
cluster, String group
         configs.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
-        configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
RangeAssignor.class.getName());

Review Comment:
   just to double check, even though we're removing this prop, I expect the 
test will still use the range assignor if under classic protocol just because 
range is the first assignor in the default value of the config. Correct?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -413,7 +434,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         HEARTBEAT_INTERVAL_MS_DOC)
                                 .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                                         Type.LIST,
-                                        Arrays.asList(RangeAssignor.class, 
CooperativeStickyAssignor.class),
+                                        PARTITION_ASSIGNOR_DEFAULT_VALUE,

Review Comment:
   This constant was introduced to be reused and that made sense, but seems 
it's only used here now right? If so I think keeping the default explicitly 
here as it was is more convenient, it just makes it easier to discover (I 
personally navigate to this definition a lot looking for defaults :))



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -379,7 +378,29 @@ public class ConsumerConfig extends AbstractConfig {
     private static final String SECURITY_PROVIDERS_DOC = 
SecurityConfig.SECURITY_PROVIDERS_DOC;
 
     private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
+    private static final List<Class<? extends AbstractPartitionAssignor>> 
PARTITION_ASSIGNOR_DEFAULT_VALUE =
+            List.of(RangeAssignor.class, CooperativeStickyAssignor.class);
 
+    /**
+     * A list of configuration keys for CLASSIC protocol not supported. we 
should check the input string and clean up the
+     * default value.
+     */
+    private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = 
Collections.singletonList(
+            GROUP_REMOTE_ASSIGNOR_CONFIG
+    );
+
+    /**
+     * A list of configuration keys for consumer protocol not supported. we 
should check the input string and clean up the
+     * default value.
+     */
+    private static final List<String> CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS = 
List.of(
+            PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
+            HEARTBEAT_INTERVAL_MS_CONFIG, 
+            SESSION_TIMEOUT_MS_CONFIG,
+            "group.max.session.timeout.ms",
+            "group.mix.session.timeout.ms"

Review Comment:
   well these configs are used by the broker only, not the client, so I would 
say we don't deal with them here and let the broker drive it? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -379,7 +378,29 @@ public class ConsumerConfig extends AbstractConfig {
     private static final String SECURITY_PROVIDERS_DOC = 
SecurityConfig.SECURITY_PROVIDERS_DOC;
 
     private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
+    private static final List<Class<? extends AbstractPartitionAssignor>> 
PARTITION_ASSIGNOR_DEFAULT_VALUE =
+            List.of(RangeAssignor.class, CooperativeStickyAssignor.class);
 
+    /**
+     * A list of configuration keys for CLASSIC protocol not supported. we 
should check the input string and clean up the
+     * default value.
+     */
+    private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = 
Collections.singletonList(
+            GROUP_REMOTE_ASSIGNOR_CONFIG
+    );
+
+    /**
+     * A list of configuration keys for consumer protocol not supported. we 
should check the input string and clean up the

Review Comment:
   ```suggestion
        * A list of configuration keys not supported for CONSUMER protocol.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to