dajac commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1481015399
########## clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java: ########## @@ -37,7 +39,10 @@ public class ConsumerGroupListing { * @param isSimpleConsumerGroup If consumer group is simple or not. */ public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup) { - this(groupId, isSimpleConsumerGroup, Optional.empty()); + this.groupId = groupId; + this.isSimpleConsumerGroup = isSimpleConsumerGroup; + this.state = Optional.empty(); + this.groupType = Optional.empty(); Review Comment: nit: Calling the other constructor as it was before is actually slightly better. ########## clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java: ########## @@ -21,14 +21,16 @@ import java.util.Optional; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupType; /** * A listing of a consumer group in the cluster. */ public class ConsumerGroupListing { private final String groupId; private final boolean isSimpleConsumerGroup; - private final Optional<ConsumerGroupState> state; + private Optional<ConsumerGroupState> state; + private Optional<GroupType> groupType; Review Comment: It is better to keep them final as this object is supposed to be immutable. ########## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ########## @@ -189,16 +199,68 @@ object ConsumerGroupCommand extends Logging { } def listGroups(): Unit = { - if (opts.options.has(opts.stateOpt)) { - val stateValue = opts.options.valueOf(opts.stateOpt) - val states = if (stateValue == null || stateValue.isEmpty) - Set[ConsumerGroupState]() - else - consumerGroupStatesFromString(stateValue) - val listings = listConsumerGroupsWithState(states) - printGroupStates(listings.map(e => (e.groupId, e.state.get.toString))) - } else + val includeType = opts.options.has(opts.typeOpt) + val includeState = opts.options.has(opts.stateOpt) + + val groupInfoMap = mutable.Map[String, (String, String)]() + + if (includeType || includeState) { + val types = getTypeValues() + val states = getStateValues() + val listings = { + listConsumerGroupsWithFilters(types, states) + } + + listings.foreach { listing => + val groupId = listing.groupId + val groupType = listing.groupType().orElse(GroupType.UNKNOWN).toString + val state = listing.state().orElse(ConsumerGroupState.UNKNOWN).toString + groupInfoMap.update(groupId, (groupType, state)) Review Comment: I wonder if converting the listing to a map is really necessary. I understand that it was there before. However, it seems that we could just pass the listing to `printGroupInfo` and avoid it. What do you think? ########## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ########## @@ -2804,8 +2804,8 @@ public void testListConsumerGroupsWithStates() throws Exception { Review Comment: It would be great if we could add more unit tests here: * Could we test the cases where the version of the request supported does not support the type filter? It should throw an unsupported exception: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java#L53. * We need to test the basic plumbing. * We need to test the different variations of `ListConsumerGroupsOptions`. ########## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ########## @@ -187,16 +197,69 @@ object ConsumerGroupCommand extends Logging { } def listGroups(): Unit = { - if (opts.options.has(opts.stateOpt)) { - val stateValue = opts.options.valueOf(opts.stateOpt) - val states = if (stateValue == null || stateValue.isEmpty) - Set[ConsumerGroupState]() - else - consumerGroupStatesFromString(stateValue) - val listings = listConsumerGroupsWithState(states) - printGroupStates(listings.map(e => (e.groupId, e.state.get.toString))) - } else + val includeState = opts.options.has(opts.stateOpt) + val includeType = opts.options.has(opts.typeOpt) + + val groupInfoMap = mutable.Map[String, (String, String)]() + + if (includeType || includeState) { + val states = getStateValues() + val types = getTypeValues() + val listings = { + listConsumerGroupsWithFilters(states, types) + } + + listings.foreach { listing => + val groupId = listing.groupId + val groupType = listing.groupType().orElse(ConsumerGroupType.UNKNOWN).toString + val state = listing.state().orElse(ConsumerGroupState.UNKNOWN).toString + groupInfoMap.update(groupId, (state, groupType)) + } + + val groupInfoList = groupInfoMap.toList.map { case (groupId, (state, groupType)) => (groupId, state, groupType) } + printGroupInfo(groupInfoList, includeState, includeType) + + } else { listConsumerGroups().foreach(println(_)) + } + } + + private def getStateValues(): Set[ConsumerGroupState] = { + val stateValue = opts.options.valueOf(opts.stateOpt) + if (stateValue == null || stateValue.isEmpty) + Set[ConsumerGroupState]() + else + consumerGroupStatesFromString(stateValue) + } + + private def getTypeValues(): Set[ConsumerGroupType] = { + val typeValue = opts.options.valueOf(opts.typeOpt) + if (typeValue == null || typeValue.isEmpty) + Set[ConsumerGroupType]() + else + consumerGroupTypesFromString(typeValue) + } + + private def printGroupInfo(groupsAndInfo: List[(String, String, String)], includeState: Boolean, includeType: Boolean): Unit = { + val maxGroupLen: Int = groupsAndInfo.foldLeft(15)((maxLen, group) => Math.max(maxLen, group._1.length)) + var header = "GROUP" Review Comment: Sorry. I should have explained it a bit more. You construct `header` by adding type and/or state if they are needed. Then, you split the `header` into a list. It seems that we could avoid this if we directly use a list. ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -64,28 +67,97 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) val expectedListing = Set( - new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + new ConsumerGroupListing(simpleGroup, true) + .setState(Optional.of(ConsumerGroupState.EMPTY)) + .setType(Optional.of(GroupType.CLASSIC)), + new ConsumerGroupListing(group, false) + .setState(Optional.of(ConsumerGroupState.STABLE)) + .setType(Optional.of(GroupType.CLASSIC)) + ) var foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet + foundListing = service.listConsumerGroupsWithFilters(Set.empty, ConsumerGroupState.values.toSet).toSet expectedListing == foundListing }, s"Expected to show groups $expectedListing, but found $foundListing") - val expectedListingStable = Set( - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + val expectedListingStable = Set.empty[ConsumerGroupListing] foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet + foundListing = service.listConsumerGroupsWithFilters(Set.empty, Set(ConsumerGroupState.PREPARING_REBALANCE)).toSet expectedListingStable == foundListing }, s"Expected to show groups $expectedListingStable, but found $foundListing") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testConsumerGroupStatesFromString(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithTypes(quorum: String, groupProtocol: String): Unit = { + val simpleGroup = "simple-group" + val protocolGroup = "protocol-group" + + addSimpleGroupExecutor(group = simpleGroup) + addConsumerGroupExecutor(numConsumers = 1) + addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, groupProtocol = groupProtocol) + + val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--type") + val service = getConsumerGroupService(cgcArgs) + + val expectedListingStable = Set.empty[ConsumerGroupListing] + + var expectedListing = Set( + new ConsumerGroupListing(simpleGroup, true) + .setState(Optional.of(ConsumerGroupState.EMPTY)) + .setType(Optional.of(GroupType.CLASSIC)), + new ConsumerGroupListing(group, false) + .setState(Optional.of(ConsumerGroupState.STABLE)) + .setType(Optional.of(GroupType.CLASSIC)) + ) + + if (groupProtocol.equals("classic")) { + expectedListing = expectedListing + new ConsumerGroupListing(protocolGroup, false) + .setState(Optional.of(ConsumerGroupState.STABLE)) + .setType(Optional.of(GroupType.CLASSIC)) + } else { + expectedListing = expectedListing + new ConsumerGroupListing(protocolGroup, false) + .setState(Optional.of(ConsumerGroupState.STABLE)) + .setType(Optional.of(GroupType.CONSUMER)) + } Review Comment: nit: Could we do something like this? ``` new ConsumerGroupListing(protocolGroup, false) .setState(Optional.of(ConsumerGroupState.STABLE)) .setType(Optional.of(GroupType.parse(groupProtocol)) ``` ########## core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala: ########## @@ -44,7 +44,7 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness { private var consumerGroupService: List[ConsumerGroupService] = List() private var consumerGroupExecutors: List[AbstractConsumerGroupExecutor] = List() - // configure the servers and clients + // Configure the servers and clients. Review Comment: nit: Let's revert this as it is not related. ########## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ########## @@ -189,16 +199,68 @@ object ConsumerGroupCommand extends Logging { } def listGroups(): Unit = { - if (opts.options.has(opts.stateOpt)) { - val stateValue = opts.options.valueOf(opts.stateOpt) - val states = if (stateValue == null || stateValue.isEmpty) - Set[ConsumerGroupState]() - else - consumerGroupStatesFromString(stateValue) - val listings = listConsumerGroupsWithState(states) - printGroupStates(listings.map(e => (e.groupId, e.state.get.toString))) - } else + val includeType = opts.options.has(opts.typeOpt) + val includeState = opts.options.has(opts.stateOpt) + + val groupInfoMap = mutable.Map[String, (String, String)]() + + if (includeType || includeState) { + val types = getTypeValues() + val states = getStateValues() + val listings = { + listConsumerGroupsWithFilters(types, states) + } Review Comment: nit: I suppose that the curly braces are not needed here. ########## clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java: ########## @@ -68,48 +97,44 @@ public boolean isSimpleConsumerGroup() { } /** - * Consumer Group state + * Consumer Group state. */ public Optional<ConsumerGroupState> state() { return state; } + /** + * The type of the consumer group. + * + * @return An Optional containing the type, if available. + */ + public Optional<GroupType> groupType() { Review Comment: nit: `type()`. ########## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ########## @@ -189,16 +199,68 @@ object ConsumerGroupCommand extends Logging { } def listGroups(): Unit = { - if (opts.options.has(opts.stateOpt)) { - val stateValue = opts.options.valueOf(opts.stateOpt) - val states = if (stateValue == null || stateValue.isEmpty) - Set[ConsumerGroupState]() - else - consumerGroupStatesFromString(stateValue) - val listings = listConsumerGroupsWithState(states) - printGroupStates(listings.map(e => (e.groupId, e.state.get.toString))) - } else + val includeType = opts.options.has(opts.typeOpt) + val includeState = opts.options.has(opts.stateOpt) + + val groupInfoMap = mutable.Map[String, (String, String)]() + + if (includeType || includeState) { + val types = getTypeValues() + val states = getStateValues() + val listings = { + listConsumerGroupsWithFilters(types, states) + } + + listings.foreach { listing => + val groupId = listing.groupId + val groupType = listing.groupType().orElse(GroupType.UNKNOWN).toString + val state = listing.state().orElse(ConsumerGroupState.UNKNOWN).toString + groupInfoMap.update(groupId, (groupType, state)) + } + + printGroupInfo(groupInfoMap, includeType, includeState) + + } else { listConsumerGroups().foreach(println(_)) + } + } + + private def getStateValues(): Set[ConsumerGroupState] = { + val stateValue = opts.options.valueOf(opts.stateOpt) + if (stateValue == null || stateValue.isEmpty) + Set[ConsumerGroupState]() + else + consumerGroupStatesFromString(stateValue) + } + + private def getTypeValues(): Set[GroupType] = { + val typeValue = opts.options.valueOf(opts.typeOpt) + if (typeValue == null || typeValue.isEmpty) + Set[GroupType]() + else + consumerGroupTypesFromString(typeValue) + } + + private def printGroupInfo(groupsAndInfo: Map[String, (String, String)], includeType: Boolean, includeState: Boolean): Unit = { + val maxGroupLen: Int = groupsAndInfo.keys.foldLeft(15)((maxLen, groupId) => Math.max(maxLen, groupId.length)) + var header = "GROUP" + var format = s"%-${maxGroupLen}s" + + if (includeType) { + header += " TYPE" + format += " %-20s" + } + if (includeState) { + header += " STATE" + format += " %-20s" + } + + println(format.format(ArraySeq.unsafeWrapArray(header.split(" ")): _*)) + + groupsAndInfo.foreach { case (groupId, (groupType, state)) => + val info = List(groupId) ++ (if (includeType) List(groupType) else List()) ++ (if (includeState) List(state) else List()) Review Comment: I was wondering we could simplify this a bit. One option would be to do the following: ``` private def printGroupInfo(groups: List[ConsumerGroupListing], includeType: Boolean, includeState: Boolean): Unit = { def groupId(groupListing: ConsumerGroupListing): String = groupListing.groupId def groupType(groupListing: ConsumerGroupListing): String = groupListing.groupType.orElse(GroupType.UNKNOWN).toString def groupState(groupListing: ConsumerGroupListing): String = groupListing.state.orElse(ConsumerGroupState.UNKNOWN).toString val maxGroupLen = groups.foldLeft(15)((maxLen, groupListing) => Math.max(maxLen, groupId(groupListing).length)) var format = s"%-${maxGroupLen}s" var header = List("Group") var extractors: List[ConsumerGroupListing => String] = List(groupId) if (includeType) { header += "TYPE" extractors += groupType format += " %-20s" } if (includeState) { header += "STATE" extractors += groupState format += " %-20s" } println(format.format(header: _*)) groups.foreach { groupListing => val info = extractors.map(extractor => extractor(groupListing)) println(format.format(info: _*)) } } ``` We could also think about making it fully generic in the future (not in this PR). For instance, we could have one function which takes a list of T, a list of header and a list of extractors. With this, you have everything to print the table. ########## clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java: ########## @@ -68,48 +97,44 @@ public boolean isSimpleConsumerGroup() { } /** - * Consumer Group state + * Consumer Group state. */ public Optional<ConsumerGroupState> state() { return state; } + /** + * The type of the consumer group. + * + * @return An Optional containing the type, if available. + */ + public Optional<GroupType> groupType() { + return groupType; + } + @Override public String toString() { return "(" + "groupId='" + groupId + '\'' + ", isSimpleConsumerGroup=" + isSimpleConsumerGroup + ", state=" + state + + ", groupType=" + groupType + Review Comment: nit: `type=`. ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -46,16 +46,16 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListWithUnrecognizedNewConsumerOption(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: String): Unit = { Review Comment: ditto. ########## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ########## @@ -189,16 +199,68 @@ object ConsumerGroupCommand extends Logging { } def listGroups(): Unit = { - if (opts.options.has(opts.stateOpt)) { - val stateValue = opts.options.valueOf(opts.stateOpt) - val states = if (stateValue == null || stateValue.isEmpty) - Set[ConsumerGroupState]() - else - consumerGroupStatesFromString(stateValue) - val listings = listConsumerGroupsWithState(states) - printGroupStates(listings.map(e => (e.groupId, e.state.get.toString))) - } else + val includeType = opts.options.has(opts.typeOpt) + val includeState = opts.options.has(opts.stateOpt) + + val groupInfoMap = mutable.Map[String, (String, String)]() + + if (includeType || includeState) { + val types = getTypeValues() + val states = getStateValues() Review Comment: nit: We usually don't prefix methods by `get`. ########## clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java: ########## @@ -54,7 +59,31 @@ public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup, Optio } /** - * Consumer Group Id + * Set the state of the consumer group. + * + * @param state The state of the consumer group. + * @return This ConsumerGroupListing instance. + */ + public ConsumerGroupListing setState(Optional<ConsumerGroupState> state) { Review Comment: This class is part of our public api therefore we cannot add new methods without doing a KIP. In this case, I don't think that there are necessary because it is preferable to keep the object immutable. Instead, we can add a new constructor which takes the existing params and the type. ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -18,44 +18,47 @@ package kafka.admin import joptsimple.OptionException import org.junit.jupiter.api.Assertions._ -import kafka.utils.TestUtils -import org.apache.kafka.common.ConsumerGroupState +import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin.ConsumerGroupListing -import java.util.Optional - +import org.apache.kafka.common.{ConsumerGroupState, GroupType} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.MethodSource + +import java.util.Optional class ListConsumerGroupTest extends ConsumerGroupCommandTest { - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListConsumerGroups(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: String): Unit = { val simpleGroup = "simple-group" + val protocolGroup = "protocol-group" + addSimpleGroupExecutor(group = simpleGroup) addConsumerGroupExecutor(numConsumers = 1) + addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, groupProtocol = groupProtocol) val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list") val service = getConsumerGroupService(cgcArgs) - val expectedGroups = Set(group, simpleGroup) + val expectedGroups = Set(protocolGroup, group, simpleGroup) var foundGroups = Set.empty[String] TestUtils.waitUntilTrue(() => { foundGroups = service.listConsumerGroups().toSet expectedGroups == foundGroups }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListWithUnrecognizedNewConsumerOption(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: String): Unit = { val cgcArgs = Array("--new-consumer", "--bootstrap-server", bootstrapServers(), "--list") assertThrows(classOf[OptionException], () => getConsumerGroupService(cgcArgs)) } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListConsumerGroupsWithStates(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithStates(quorum: String, groupProtocol: String): Unit = { val simpleGroup = "simple-group" addSimpleGroupExecutor(group = simpleGroup) addConsumerGroupExecutor(numConsumers = 1) Review Comment: Should we add another consumer based on `groupProtocol`? ########## clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java: ########## @@ -68,48 +97,44 @@ public boolean isSimpleConsumerGroup() { } /** - * Consumer Group state + * Consumer Group state. */ public Optional<ConsumerGroupState> state() { return state; } + /** + * The type of the consumer group. + * + * @return An Optional containing the type, if available. + */ + public Optional<GroupType> groupType() { + return groupType; + } + @Override public String toString() { return "(" + "groupId='" + groupId + '\'' + ", isSimpleConsumerGroup=" + isSimpleConsumerGroup + ", state=" + state + + ", groupType=" + groupType + ')'; } @Override public int hashCode() { - return Objects.hash(groupId, isSimpleConsumerGroup, state); + return Objects.hash(groupId, isSimpleConsumerGroup(), state, groupType); } @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - ConsumerGroupListing other = (ConsumerGroupListing) obj; - if (groupId == null) { Review Comment: This check is not there anymore with the new code. I wonder if we have cases where `groupId` could be `null`. We could use `Objects.equals` to be safe. ########## clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java: ########## @@ -21,14 +21,16 @@ import java.util.Optional; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupType; /** * A listing of a consumer group in the cluster. */ public class ConsumerGroupListing { private final String groupId; private final boolean isSimpleConsumerGroup; - private final Optional<ConsumerGroupState> state; + private Optional<ConsumerGroupState> state; + private Optional<GroupType> groupType; Review Comment: nit: Let's call it `type` as we called it `type` in `ConsumerGroupDescription`. ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -46,16 +46,16 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListWithUnrecognizedNewConsumerOption(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: String): Unit = { Review Comment: I reiterate my question. The test verifies that `--new-consumer` is not accepted with `--list`. This is completely unrelated to zk vs kraft, old coordinator vs new coordinator, old consumer vs new consumer so all the combinations that we test here are not relevant. Would it make sense to keep it as it was before? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org