chia7712 commented on code in PR #17057: URL: https://github.com/apache/kafka/pull/17057#discussion_r1738568876
########## core/src/test/java/kafka/test/ClusterInstance.java: ########## @@ -160,15 +159,11 @@ default Admin createAdminClient() { } default Set<GroupProtocol> supportedGroupProtocols() { - Map<String, String> serverProperties = config().serverProperties(); - Set<GroupProtocol> supportedGroupProtocols = new HashSet<>(); - supportedGroupProtocols.add(CLASSIC); - - if (serverProperties.getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "").contains("consumer")) { - supportedGroupProtocols.add(CONSUMER); + if (isKRaftTest()) { Review Comment: The `CONSUMER` protocol can be disabled even though it is kraft mode, right? Maybe we can check the `KafkaAPIs` directly. for example: ```java default Set<GroupProtocol> supportedGroupProtocols() { if (brokers().values().stream().allMatch(b -> b.dataPlaneRequestProcessor().isConsumerGroupProtocolEnabled())) { return mkSet(CLASSIC, CONSUMER); } else { return Collections.singleton(CLASSIC); } } ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ########## @@ -76,9 +76,9 @@ public class GroupCoordinatorConfig { public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = "group.coordinator.rebalance.protocols"; public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols. Supported protocols: " + Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(",")) + ". " + - "The " + Group.GroupType.CONSUMER + " rebalance protocol is in preview and therefore must not be used in production. " + "The " + Group.GroupType.SHARE + " rebalance protocol is in early access and therefore must not be used in production."; - public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = Collections.singletonList(Group.GroupType.CLASSIC.toString()); + public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = + Arrays.asList(Group.GroupType.CLASSIC.toString(), Group.GroupType.CONSUMER.toString()); Review Comment: Please make it immutable. ########## core/src/main/scala/kafka/server/KafkaConfig.scala: ########## @@ -576,19 +576,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' protocol is not supported.") } if (protocols.contains(GroupType.CONSUMER)) { - if (processRoles.isEmpty) { - throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported in KRaft cluster.") - } - if (!isNewGroupCoordinatorEnabled) { - throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported by the new group coordinator.") + if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) { + warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported in KRaft cluster with the new group coordinator.") Review Comment: Maybe `KafkaAPIs#isConsumerGroupProtocolEnabled` needs similar log when the `isConsumerRebalanceProtocolSupported` return false and `CONSUMER` protocol is enabled. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L3816 ########## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ########## @@ -191,45 +189,17 @@ public void testDefaults(ClusterInstance clusterInstance) { Assertions.assertEquals(MetadataVersion.latestTesting(), clusterInstance.config().metadataVersion()); } - @ClusterTests({ - @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), - }), - @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), - }) - }) + @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}) public void testSupportedNewGroupProtocols(ClusterInstance clusterInstance) { Set<GroupProtocol> supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); supportedGroupProtocols.add(CONSUMER); - Assertions.assertTrue(clusterInstance.supportedGroupProtocols().containsAll(supportedGroupProtocols)); - Assertions.assertEquals(2, clusterInstance.supportedGroupProtocols().size()); + Assertions.assertEquals(supportedGroupProtocols, clusterInstance.supportedGroupProtocols()); } - @ClusterTests({ - @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - }), - @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"), - }), - @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { Review Comment: why remove this test case? it seems to be legal configs ########## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java: ########## @@ -79,34 +77,34 @@ private ConsumerGroupCommandTestUtils() { } static List<ClusterConfig> generator() { - return Stream.concat(forConsumerGroupCoordinator().stream(), forClassicGroupCoordinator().stream()) - .collect(Collectors.toList()); + return Stream + .concat(forKRaftGroupCoordinator().stream(), forZkGroupCoordinator().stream()) + .collect(Collectors.toList()); } - static List<ClusterConfig> forConsumerGroupCoordinator() { + static List<ClusterConfig> forKRaftGroupCoordinator() { Map<String, String> serverProperties = new HashMap<>(); serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1"); serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"); Review Comment: this can be removed too. ########## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ########## @@ -191,45 +189,17 @@ public void testDefaults(ClusterInstance clusterInstance) { Assertions.assertEquals(MetadataVersion.latestTesting(), clusterInstance.config().metadataVersion()); } - @ClusterTests({ - @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), - }), - @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), - }) - }) + @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}) public void testSupportedNewGroupProtocols(ClusterInstance clusterInstance) { Set<GroupProtocol> supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); supportedGroupProtocols.add(CONSUMER); - Assertions.assertTrue(clusterInstance.supportedGroupProtocols().containsAll(supportedGroupProtocols)); - Assertions.assertEquals(2, clusterInstance.supportedGroupProtocols().size()); + Assertions.assertEquals(supportedGroupProtocols, clusterInstance.supportedGroupProtocols()); } - @ClusterTests({ - @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - }), - @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"), - }), - @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"), - }), - @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"), - }), - @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org