dajac commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1457193686
########## clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java: ########## @@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroup private Set<ConsumerGroupState> states = Collections.emptySet(); + private Set<ConsumerGroupType> groupTypes = Collections.emptySet(); Review Comment: nit: `types`? ########## clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java: ########## @@ -21,95 +21,111 @@ import java.util.Optional; Review Comment: This class is part of our public API therefore we cannot change the existing (e.g. removing constructors). Could you please revert all the unnecessary changes here and only add the new field and its related changes? ########## clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java: ########## @@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroup private Set<ConsumerGroupState> states = Collections.emptySet(); + private Set<ConsumerGroupType> groupTypes = Collections.emptySet(); + /** - * If states is set, only groups in these states will be returned by listConsumerGroups() + * If states is set, only groups in these states will be returned by listConsumerGroups(). * Otherwise, all groups are returned. * This operation is supported by brokers with version 2.6.0 or later. */ public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) { - this.states = (states == null) ? Collections.emptySet() : new HashSet<>(states); + this.states = (states == null || states.isEmpty()) ? Collections.emptySet() : states; + return this; + } + + /** + * If groupTypes is set, only groups of these groupTypes will be returned by listConsumerGroups(). + * Otherwise, all groups are returned. + * + */ + public ListConsumerGroupsOptions inTypes(Set<ConsumerGroupType> groupTypes) { + this.groupTypes = (groupTypes == null || groupTypes.isEmpty()) ? Collections.emptySet() : groupTypes; Review Comment: Should we make a copy of the types? ########## clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java: ########## @@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroup private Set<ConsumerGroupState> states = Collections.emptySet(); + private Set<ConsumerGroupType> groupTypes = Collections.emptySet(); + /** - * If states is set, only groups in these states will be returned by listConsumerGroups() + * If states is set, only groups in these states will be returned by listConsumerGroups(). * Otherwise, all groups are returned. * This operation is supported by brokers with version 2.6.0 or later. */ public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) { - this.states = (states == null) ? Collections.emptySet() : new HashSet<>(states); + this.states = (states == null || states.isEmpty()) ? Collections.emptySet() : states; + return this; + } + + /** + * If groupTypes is set, only groups of these groupTypes will be returned by listConsumerGroups(). + * Otherwise, all groups are returned. + * Review Comment: nit: This empty line could be removed. ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -64,28 +64,89 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) val expectedListing = Set( - new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + new ConsumerGroupListing(simpleGroup, true) Review Comment: Do we need all the changes in this test? It may be better to keep it as it was. ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -46,16 +46,16 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListWithUnrecognizedNewConsumerOption(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: String): Unit = { Review Comment: Does it bring any value to test all the combinations in this case? ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -64,28 +64,89 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) val expectedListing = Set( - new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + new ConsumerGroupListing(simpleGroup, true) + .setState(Optional.of(ConsumerGroupState.EMPTY)) + .setType(if (quorum.contains("kip848")) Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()), + new ConsumerGroupListing(group, false) + .setState(Optional.of(ConsumerGroupState.STABLE)) + .setType(if (quorum.contains("kip848")) Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()) + ) var foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet + foundListing = service.listConsumerGroupsWithFilters(ConsumerGroupState.values.toSet, Set.empty).toSet expectedListing == foundListing }, s"Expected to show groups $expectedListing, but found $foundListing") - val expectedListingStable = Set( - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + val expectedListingStable = Set.empty[ConsumerGroupListing] foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet + foundListing = service.listConsumerGroupsWithFilters(Set(ConsumerGroupState.PREPARING_REBALANCE), Set.empty).toSet expectedListingStable == foundListing }, s"Expected to show groups $expectedListingStable, but found $foundListing") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testConsumerGroupStatesFromString(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithTypes(quorum: String, groupProtocol: String): Unit = { + val simpleGroup = "simple-group" + addSimpleGroupExecutor(group = simpleGroup) Review Comment: It would be great if we could also have a new consumer in this test. ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2229,7 +2229,7 @@ void handleResponse(AbstractResponse abstractResponse) { String topicName = cluster.topicName(topicId); if (topicName == null) { - future.completeExceptionally(new UnknownTopicIdException("TopicId " + topicId + " not found.")); + future.completeExceptionally(new InvalidTopicException("TopicId " + topicId + " not found.")); Review Comment: Why are we changing this? It does not seem related. ########## clients/src/main/java/org/apache/kafka/common/ConsumerGroupType.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum ConsumerGroupType { Review Comment: I would rather call it `GroupType`. ########## clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java: ########## @@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroup private Set<ConsumerGroupState> states = Collections.emptySet(); + private Set<ConsumerGroupType> groupTypes = Collections.emptySet(); + /** - * If states is set, only groups in these states will be returned by listConsumerGroups() + * If states is set, only groups in these states will be returned by listConsumerGroups(). * Otherwise, all groups are returned. * This operation is supported by brokers with version 2.6.0 or later. */ public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) { - this.states = (states == null) ? Collections.emptySet() : new HashSet<>(states); + this.states = (states == null || states.isEmpty()) ? Collections.emptySet() : states; + return this; + } + + /** + * If groupTypes is set, only groups of these groupTypes will be returned by listConsumerGroups(). + * Otherwise, all groups are returned. + * + */ + public ListConsumerGroupsOptions inTypes(Set<ConsumerGroupType> groupTypes) { Review Comment: This KIP says `withTypes` here. ########## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ########## @@ -102,6 +103,15 @@ object ConsumerGroupCommand extends Logging { parsedStates } + def consumerGroupTypesFromString(input: String): Set[ConsumerGroupType] = { + val parsedStates = input.split(',').map(s => ConsumerGroupType.parse(s.trim)).toSet Review Comment: nit: `parsedTypes`? ########## clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java: ########## @@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroup private Set<ConsumerGroupState> states = Collections.emptySet(); + private Set<ConsumerGroupType> groupTypes = Collections.emptySet(); + /** - * If states is set, only groups in these states will be returned by listConsumerGroups() + * If states is set, only groups in these states will be returned by listConsumerGroups(). * Otherwise, all groups are returned. * This operation is supported by brokers with version 2.6.0 or later. */ public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) { - this.states = (states == null) ? Collections.emptySet() : new HashSet<>(states); + this.states = (states == null || states.isEmpty()) ? Collections.emptySet() : states; + return this; + } + + /** + * If groupTypes is set, only groups of these groupTypes will be returned by listConsumerGroups(). Review Comment: nit: `groupTypes` -> `types`? We could change it everywhere in this file. ########## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ########## @@ -187,16 +197,69 @@ object ConsumerGroupCommand extends Logging { } def listGroups(): Unit = { - if (opts.options.has(opts.stateOpt)) { - val stateValue = opts.options.valueOf(opts.stateOpt) - val states = if (stateValue == null || stateValue.isEmpty) - Set[ConsumerGroupState]() - else - consumerGroupStatesFromString(stateValue) - val listings = listConsumerGroupsWithState(states) - printGroupStates(listings.map(e => (e.groupId, e.state.get.toString))) - } else + val includeState = opts.options.has(opts.stateOpt) + val includeType = opts.options.has(opts.typeOpt) + + val groupInfoMap = mutable.Map[String, (String, String)]() + + if (includeType || includeState) { + val states = getStateValues() + val types = getTypeValues() + val listings = { + listConsumerGroupsWithFilters(states, types) + } + + listings.foreach { listing => + val groupId = listing.groupId + val groupType = listing.groupType().orElse(ConsumerGroupType.UNKNOWN).toString + val state = listing.state().orElse(ConsumerGroupState.UNKNOWN).toString + groupInfoMap.update(groupId, (state, groupType)) + } + + val groupInfoList = groupInfoMap.toList.map { case (groupId, (state, groupType)) => (groupId, state, groupType) } + printGroupInfo(groupInfoList, includeState, includeType) + + } else { listConsumerGroups().foreach(println(_)) + } + } + + private def getStateValues(): Set[ConsumerGroupState] = { + val stateValue = opts.options.valueOf(opts.stateOpt) + if (stateValue == null || stateValue.isEmpty) + Set[ConsumerGroupState]() + else + consumerGroupStatesFromString(stateValue) + } + + private def getTypeValues(): Set[ConsumerGroupType] = { + val typeValue = opts.options.valueOf(opts.typeOpt) + if (typeValue == null || typeValue.isEmpty) + Set[ConsumerGroupType]() + else + consumerGroupTypesFromString(typeValue) + } + + private def printGroupInfo(groupsAndInfo: List[(String, String, String)], includeState: Boolean, includeType: Boolean): Unit = { + val maxGroupLen: Int = groupsAndInfo.foldLeft(15)((maxLen, group) => Math.max(maxLen, group._1.length)) + var header = "GROUP" Review Comment: It may be better to use a List[String]. ########## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ########## @@ -2500,12 +2500,12 @@ public void testDescribeTopicsByIds() { try { Review Comment: Should we add tests with group types? ########## clients/src/main/java/org/apache/kafka/common/ConsumerGroupType.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum ConsumerGroupType { + UNKNOWN("unknown"), + CONSUMER("consumer"), + CLASSIC("classic"); + + private final static Map<String, ConsumerGroupType> NAME_TO_ENUM = Arrays.stream(values()) + .collect(Collectors.toMap(type -> type.name, Function.identity())); + + private final String name; + + ConsumerGroupType(String name) { + this.name = name; + } + + /** + * Parse a string into a consumer group type. + */ + public static ConsumerGroupType parse(String name) { Review Comment: Should we make this case insensitive? ########## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ########## @@ -187,16 +197,69 @@ object ConsumerGroupCommand extends Logging { } def listGroups(): Unit = { - if (opts.options.has(opts.stateOpt)) { - val stateValue = opts.options.valueOf(opts.stateOpt) - val states = if (stateValue == null || stateValue.isEmpty) - Set[ConsumerGroupState]() - else - consumerGroupStatesFromString(stateValue) - val listings = listConsumerGroupsWithState(states) - printGroupStates(listings.map(e => (e.groupId, e.state.get.toString))) - } else + val includeState = opts.options.has(opts.stateOpt) + val includeType = opts.options.has(opts.typeOpt) + + val groupInfoMap = mutable.Map[String, (String, String)]() + + if (includeType || includeState) { + val states = getStateValues() + val types = getTypeValues() + val listings = { + listConsumerGroupsWithFilters(states, types) + } + + listings.foreach { listing => + val groupId = listing.groupId + val groupType = listing.groupType().orElse(ConsumerGroupType.UNKNOWN).toString + val state = listing.state().orElse(ConsumerGroupState.UNKNOWN).toString + groupInfoMap.update(groupId, (state, groupType)) + } + + val groupInfoList = groupInfoMap.toList.map { case (groupId, (state, groupType)) => (groupId, state, groupType) } + printGroupInfo(groupInfoList, includeState, includeType) + + } else { listConsumerGroups().foreach(println(_)) + } + } + + private def getStateValues(): Set[ConsumerGroupState] = { + val stateValue = opts.options.valueOf(opts.stateOpt) + if (stateValue == null || stateValue.isEmpty) + Set[ConsumerGroupState]() + else + consumerGroupStatesFromString(stateValue) + } + + private def getTypeValues(): Set[ConsumerGroupType] = { + val typeValue = opts.options.valueOf(opts.typeOpt) + if (typeValue == null || typeValue.isEmpty) + Set[ConsumerGroupType]() + else + consumerGroupTypesFromString(typeValue) + } + + private def printGroupInfo(groupsAndInfo: List[(String, String, String)], includeState: Boolean, includeType: Boolean): Unit = { + val maxGroupLen: Int = groupsAndInfo.foldLeft(15)((maxLen, group) => Math.max(maxLen, group._1.length)) + var header = "GROUP" + var format = s"%-${maxGroupLen}s" + + if (includeState) { + header += " STATE" + format += " %-20s" + } + if (includeType) { + header += " TYPE" + format += " %-20s" + } Review Comment: nit: I would print the type before the state. ########## clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java: ########## @@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroup private Set<ConsumerGroupState> states = Collections.emptySet(); + private Set<ConsumerGroupType> groupTypes = Collections.emptySet(); + /** - * If states is set, only groups in these states will be returned by listConsumerGroups() + * If states is set, only groups in these states will be returned by listConsumerGroups(). * Otherwise, all groups are returned. * This operation is supported by brokers with version 2.6.0 or later. */ public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) { - this.states = (states == null) ? Collections.emptySet() : new HashSet<>(states); + this.states = (states == null || states.isEmpty()) ? Collections.emptySet() : states; Review Comment: Why are we changing this? Making a copy of `states` seems to be the right thing to do here. ########## clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java: ########## @@ -50,4 +62,11 @@ public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) { public Set<ConsumerGroupState> states() { return states; } + + /** + * Returns the list of types that are requested or empty if no groupTypes have been specified + */ + public Set<ConsumerGroupType> groupTypes() { Review Comment: nit: `types`? ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -46,16 +46,16 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListWithUnrecognizedNewConsumerOption(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: String): Unit = { Review Comment: Does it bring any value to test all the combinations in this case? ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -64,28 +64,89 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) val expectedListing = Set( - new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + new ConsumerGroupListing(simpleGroup, true) + .setState(Optional.of(ConsumerGroupState.EMPTY)) + .setType(if (quorum.contains("kip848")) Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()), + new ConsumerGroupListing(group, false) + .setState(Optional.of(ConsumerGroupState.STABLE)) + .setType(if (quorum.contains("kip848")) Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()) + ) var foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet + foundListing = service.listConsumerGroupsWithFilters(ConsumerGroupState.values.toSet, Set.empty).toSet expectedListing == foundListing }, s"Expected to show groups $expectedListing, but found $foundListing") - val expectedListingStable = Set( - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + val expectedListingStable = Set.empty[ConsumerGroupListing] foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet + foundListing = service.listConsumerGroupsWithFilters(Set(ConsumerGroupState.PREPARING_REBALANCE), Set.empty).toSet expectedListingStable == foundListing }, s"Expected to show groups $expectedListingStable, but found $foundListing") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testConsumerGroupStatesFromString(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithTypes(quorum: String, groupProtocol: String): Unit = { + val simpleGroup = "simple-group" + addSimpleGroupExecutor(group = simpleGroup) + addConsumerGroupExecutor(numConsumers = 1) + + val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--type") + val service = getConsumerGroupService(cgcArgs) + + val expectedListingStable = Set.empty[ConsumerGroupListing] + + val expectedListing = Set( + new ConsumerGroupListing(simpleGroup, true) + .setState(Optional.of(ConsumerGroupState.EMPTY)) + .setType(if(quorum.contains("kip848")) Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()), Review Comment: Why are we doing this? The type should work in all combinations, no? ########## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ########## @@ -187,16 +197,69 @@ object ConsumerGroupCommand extends Logging { } def listGroups(): Unit = { - if (opts.options.has(opts.stateOpt)) { - val stateValue = opts.options.valueOf(opts.stateOpt) - val states = if (stateValue == null || stateValue.isEmpty) - Set[ConsumerGroupState]() - else - consumerGroupStatesFromString(stateValue) - val listings = listConsumerGroupsWithState(states) - printGroupStates(listings.map(e => (e.groupId, e.state.get.toString))) - } else + val includeState = opts.options.has(opts.stateOpt) + val includeType = opts.options.has(opts.typeOpt) + + val groupInfoMap = mutable.Map[String, (String, String)]() + + if (includeType || includeState) { + val states = getStateValues() + val types = getTypeValues() + val listings = { + listConsumerGroupsWithFilters(states, types) + } + + listings.foreach { listing => + val groupId = listing.groupId + val groupType = listing.groupType().orElse(ConsumerGroupType.UNKNOWN).toString + val state = listing.state().orElse(ConsumerGroupState.UNKNOWN).toString + groupInfoMap.update(groupId, (state, groupType)) + } + + val groupInfoList = groupInfoMap.toList.map { case (groupId, (state, groupType)) => (groupId, state, groupType) } Review Comment: I wonder if this conversion is really necessary. Passing the Map to `printGroupInfo` seems reasonable too. ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -64,28 +64,89 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) val expectedListing = Set( - new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + new ConsumerGroupListing(simpleGroup, true) + .setState(Optional.of(ConsumerGroupState.EMPTY)) + .setType(if (quorum.contains("kip848")) Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()), + new ConsumerGroupListing(group, false) + .setState(Optional.of(ConsumerGroupState.STABLE)) + .setType(if (quorum.contains("kip848")) Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()) + ) var foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet + foundListing = service.listConsumerGroupsWithFilters(ConsumerGroupState.values.toSet, Set.empty).toSet expectedListing == foundListing }, s"Expected to show groups $expectedListing, but found $foundListing") - val expectedListingStable = Set( - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + val expectedListingStable = Set.empty[ConsumerGroupListing] foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet + foundListing = service.listConsumerGroupsWithFilters(Set(ConsumerGroupState.PREPARING_REBALANCE), Set.empty).toSet expectedListingStable == foundListing }, s"Expected to show groups $expectedListingStable, but found $foundListing") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testConsumerGroupStatesFromString(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithTypes(quorum: String, groupProtocol: String): Unit = { + val simpleGroup = "simple-group" + addSimpleGroupExecutor(group = simpleGroup) + addConsumerGroupExecutor(numConsumers = 1) + + val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--type") + val service = getConsumerGroupService(cgcArgs) + + val expectedListingStable = Set.empty[ConsumerGroupListing] + + val expectedListing = Set( + new ConsumerGroupListing(simpleGroup, true) + .setState(Optional.of(ConsumerGroupState.EMPTY)) + .setType(if(quorum.contains("kip848")) Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()), + new ConsumerGroupListing(group, false) + .setState(Optional.of(ConsumerGroupState.STABLE)) + .setType(if(quorum.contains("kip848")) Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()) + ) + + var foundListing = Set.empty[ConsumerGroupListing] + TestUtils.waitUntilTrue(() => { + foundListing = service.listConsumerGroupsWithFilters(Set.empty, Set.empty).toSet + expectedListing == foundListing + }, s"Expected to show groups $expectedListing, but found $foundListing") + + // When group type is mentioned: Review Comment: This test is a bit confusing. I wonder if it would be better to split it in multiple tests. For instance, we could have one testing the old coordinator in ZK or KRaft mode. Then on testing the new coordinator in KRaft mode. What do you think? ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -18,19 +18,19 @@ package kafka.admin import joptsimple.OptionException import org.junit.jupiter.api.Assertions._ -import kafka.utils.TestUtils -import org.apache.kafka.common.ConsumerGroupState +import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin.ConsumerGroupListing -import java.util.Optional - +import org.apache.kafka.common.{ConsumerGroupState, ConsumerGroupType} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.MethodSource + +import java.util.Optional class ListConsumerGroupTest extends ConsumerGroupCommandTest { - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListConsumerGroups(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: String): Unit = { Review Comment: Does `groupProtocol` have any effect on the consumer created in this test? ########## core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala: ########## @@ -116,6 +123,18 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness { } object ConsumerGroupCommandTest { + // We want to test the following combinations: + // * ZooKeeper and the classic group protocol. + // * KRaft and the classic group protocol. + // * KRaft with the new group coordinator enabled and the classic group protocol. + // * KRaft with the new group coordinator enabled and the consumer group protocol. + def getTestQuorumAndGroupProtocolParametersAll: java.util.stream.Stream[Arguments] = { Review Comment: We already have the same one defined in BaseConsumerTest. I would if we could reuse it. Could we? ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -46,16 +46,16 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListWithUnrecognizedNewConsumerOption(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: String): Unit = { val cgcArgs = Array("--new-consumer", "--bootstrap-server", bootstrapServers(), "--list") assertThrows(classOf[OptionException], () => getConsumerGroupService(cgcArgs)) } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListConsumerGroupsWithStates(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithStates(quorum: String, groupProtocol: String): Unit = { Review Comment: Same question. Is it necessary? Or would using `@ValueSource(strings = Array("zk", "kraft", "kraft+kip-848"))` be enough? ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -104,9 +165,27 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupStatesFromString(" , ,")) } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListGroupCommand(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumerGroupTypesFromString(quorum: String, groupProtocol: String): Unit = { + var result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer") + assertEquals(Set(ConsumerGroupType.CONSUMER), result) + + result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer, classic") + assertEquals(Set(ConsumerGroupType.CONSUMER, ConsumerGroupType.CLASSIC), result) + + assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong")) + + assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupTypesFromString("Consumer")) Review Comment: hum... I expect this to actually work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org