dajac commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1457193686


##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##########
@@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends 
AbstractOptions<ListConsumerGroup
 
     private Set<ConsumerGroupState> states = Collections.emptySet();
 
+    private Set<ConsumerGroupType> groupTypes = Collections.emptySet();

Review Comment:
   nit: `types`?



##########
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##########
@@ -21,95 +21,111 @@
 import java.util.Optional;

Review Comment:
   This class is part of our public API therefore we cannot change the existing 
(e.g. removing constructors). Could you please revert all the unnecessary 
changes here and only add the new field and its related changes?



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##########
@@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends 
AbstractOptions<ListConsumerGroup
 
     private Set<ConsumerGroupState> states = Collections.emptySet();
 
+    private Set<ConsumerGroupType> groupTypes = Collections.emptySet();
+
     /**
-     * If states is set, only groups in these states will be returned by 
listConsumerGroups()
+     * If states is set, only groups in these states will be returned by 
listConsumerGroups().
      * Otherwise, all groups are returned.
      * This operation is supported by brokers with version 2.6.0 or later.
      */
     public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
-        this.states = (states == null) ? Collections.emptySet() : new 
HashSet<>(states);
+        this.states = (states == null || states.isEmpty()) ? 
Collections.emptySet() : states;
+        return this;
+    }
+
+    /**
+     * If groupTypes is set, only groups of these groupTypes will be returned 
by listConsumerGroups().
+     * Otherwise, all groups are returned.
+     *
+     */
+    public ListConsumerGroupsOptions inTypes(Set<ConsumerGroupType> 
groupTypes) {
+        this.groupTypes = (groupTypes == null || groupTypes.isEmpty()) ? 
Collections.emptySet() : groupTypes;

Review Comment:
   Should we make a copy of the types?



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##########
@@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends 
AbstractOptions<ListConsumerGroup
 
     private Set<ConsumerGroupState> states = Collections.emptySet();
 
+    private Set<ConsumerGroupType> groupTypes = Collections.emptySet();
+
     /**
-     * If states is set, only groups in these states will be returned by 
listConsumerGroups()
+     * If states is set, only groups in these states will be returned by 
listConsumerGroups().
      * Otherwise, all groups are returned.
      * This operation is supported by brokers with version 2.6.0 or later.
      */
     public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
-        this.states = (states == null) ? Collections.emptySet() : new 
HashSet<>(states);
+        this.states = (states == null || states.isEmpty()) ? 
Collections.emptySet() : states;
+        return this;
+    }
+
+    /**
+     * If groupTypes is set, only groups of these groupTypes will be returned 
by listConsumerGroups().
+     * Otherwise, all groups are returned.
+     *

Review Comment:
   nit: This empty line could be removed.



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +64,89 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     val expectedListing = Set(
-      new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+      new ConsumerGroupListing(simpleGroup, true)

Review Comment:
   Do we need all the changes in this test? It may be better to keep it as it 
was.



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -46,16 +46,16 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     }, s"Expected --list to show groups $expectedGroups, but found 
$foundGroups.")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListWithUnrecognizedNewConsumerOption(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: 
String): Unit = {

Review Comment:
   Does it bring any value to test all the combinations in this case?



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +64,89 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     val expectedListing = Set(
-      new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+      new ConsumerGroupListing(simpleGroup, true)
+        .setState(Optional.of(ConsumerGroupState.EMPTY))
+        .setType(if (quorum.contains("kip848")) 
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()),
+      new ConsumerGroupListing(group, false)
+        .setState(Optional.of(ConsumerGroupState.STABLE))
+        .setType(if (quorum.contains("kip848")) 
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty())
+    )
 
     var foundListing = Set.empty[ConsumerGroupListing]
     TestUtils.waitUntilTrue(() => {
-      foundListing = 
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+      foundListing = 
service.listConsumerGroupsWithFilters(ConsumerGroupState.values.toSet, 
Set.empty).toSet
       expectedListing == foundListing
     }, s"Expected to show groups $expectedListing, but found $foundListing")
 
-    val expectedListingStable = Set(
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+    val expectedListingStable = Set.empty[ConsumerGroupListing]
 
     foundListing = Set.empty[ConsumerGroupListing]
     TestUtils.waitUntilTrue(() => {
-      foundListing = 
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+      foundListing = 
service.listConsumerGroupsWithFilters(Set(ConsumerGroupState.PREPARING_REBALANCE),
 Set.empty).toSet
       expectedListingStable == foundListing
     }, s"Expected to show groups $expectedListingStable, but found 
$foundListing")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testConsumerGroupStatesFromString(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithTypes(quorum: String, groupProtocol: String): 
Unit = {
+    val simpleGroup = "simple-group"
+    addSimpleGroupExecutor(group = simpleGroup)

Review Comment:
   It would be great if we could also have a new consumer in this test.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2229,7 +2229,7 @@ void handleResponse(AbstractResponse abstractResponse) {
 
                     String topicName = cluster.topicName(topicId);
                     if (topicName == null) {
-                        future.completeExceptionally(new 
UnknownTopicIdException("TopicId " + topicId + " not found."));
+                        future.completeExceptionally(new 
InvalidTopicException("TopicId " + topicId + " not found."));

Review Comment:
   Why are we changing this? It does not seem related.



##########
clients/src/main/java/org/apache/kafka/common/ConsumerGroupType.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum ConsumerGroupType {

Review Comment:
   I would rather call it `GroupType`.



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##########
@@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends 
AbstractOptions<ListConsumerGroup
 
     private Set<ConsumerGroupState> states = Collections.emptySet();
 
+    private Set<ConsumerGroupType> groupTypes = Collections.emptySet();
+
     /**
-     * If states is set, only groups in these states will be returned by 
listConsumerGroups()
+     * If states is set, only groups in these states will be returned by 
listConsumerGroups().
      * Otherwise, all groups are returned.
      * This operation is supported by brokers with version 2.6.0 or later.
      */
     public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
-        this.states = (states == null) ? Collections.emptySet() : new 
HashSet<>(states);
+        this.states = (states == null || states.isEmpty()) ? 
Collections.emptySet() : states;
+        return this;
+    }
+
+    /**
+     * If groupTypes is set, only groups of these groupTypes will be returned 
by listConsumerGroups().
+     * Otherwise, all groups are returned.
+     *
+     */
+    public ListConsumerGroupsOptions inTypes(Set<ConsumerGroupType> 
groupTypes) {

Review Comment:
   This KIP says `withTypes` here.



##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -102,6 +103,15 @@ object ConsumerGroupCommand extends Logging {
     parsedStates
   }
 
+  def consumerGroupTypesFromString(input: String): Set[ConsumerGroupType] = {
+    val parsedStates = input.split(',').map(s => 
ConsumerGroupType.parse(s.trim)).toSet

Review Comment:
   nit: `parsedTypes`?



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##########
@@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends 
AbstractOptions<ListConsumerGroup
 
     private Set<ConsumerGroupState> states = Collections.emptySet();
 
+    private Set<ConsumerGroupType> groupTypes = Collections.emptySet();
+
     /**
-     * If states is set, only groups in these states will be returned by 
listConsumerGroups()
+     * If states is set, only groups in these states will be returned by 
listConsumerGroups().
      * Otherwise, all groups are returned.
      * This operation is supported by brokers with version 2.6.0 or later.
      */
     public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
-        this.states = (states == null) ? Collections.emptySet() : new 
HashSet<>(states);
+        this.states = (states == null || states.isEmpty()) ? 
Collections.emptySet() : states;
+        return this;
+    }
+
+    /**
+     * If groupTypes is set, only groups of these groupTypes will be returned 
by listConsumerGroups().

Review Comment:
   nit: `groupTypes` -> `types`? We could change it everywhere in this file.



##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -187,16 +197,69 @@ object ConsumerGroupCommand extends Logging {
     }
 
     def listGroups(): Unit = {
-      if (opts.options.has(opts.stateOpt)) {
-        val stateValue = opts.options.valueOf(opts.stateOpt)
-        val states = if (stateValue == null || stateValue.isEmpty)
-          Set[ConsumerGroupState]()
-        else
-          consumerGroupStatesFromString(stateValue)
-        val listings = listConsumerGroupsWithState(states)
-        printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
-      } else
+      val includeState = opts.options.has(opts.stateOpt)
+      val includeType = opts.options.has(opts.typeOpt)
+
+      val groupInfoMap = mutable.Map[String, (String, String)]()
+
+      if (includeType || includeState) {
+        val states = getStateValues()
+        val types = getTypeValues()
+        val listings = {
+          listConsumerGroupsWithFilters(states, types)
+        }
+
+        listings.foreach { listing =>
+          val groupId = listing.groupId
+          val groupType = 
listing.groupType().orElse(ConsumerGroupType.UNKNOWN).toString
+          val state = 
listing.state().orElse(ConsumerGroupState.UNKNOWN).toString
+          groupInfoMap.update(groupId, (state, groupType))
+        }
+
+        val groupInfoList = groupInfoMap.toList.map { case (groupId, (state, 
groupType)) => (groupId, state, groupType) }
+        printGroupInfo(groupInfoList, includeState, includeType)
+
+      } else {
         listConsumerGroups().foreach(println(_))
+      }
+    }
+
+    private def getStateValues(): Set[ConsumerGroupState] = {
+      val stateValue = opts.options.valueOf(opts.stateOpt)
+      if (stateValue == null || stateValue.isEmpty)
+        Set[ConsumerGroupState]()
+      else
+        consumerGroupStatesFromString(stateValue)
+    }
+
+    private def getTypeValues(): Set[ConsumerGroupType] = {
+      val typeValue = opts.options.valueOf(opts.typeOpt)
+      if (typeValue == null || typeValue.isEmpty)
+        Set[ConsumerGroupType]()
+      else
+        consumerGroupTypesFromString(typeValue)
+    }
+
+    private def printGroupInfo(groupsAndInfo: List[(String, String, String)], 
includeState: Boolean, includeType: Boolean): Unit = {
+      val maxGroupLen: Int = groupsAndInfo.foldLeft(15)((maxLen, group) => 
Math.max(maxLen, group._1.length))
+      var header = "GROUP"

Review Comment:
   It may be better to use a List[String].



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2500,12 +2500,12 @@ public void testDescribeTopicsByIds() {
             try {

Review Comment:
   Should we add tests with group types?



##########
clients/src/main/java/org/apache/kafka/common/ConsumerGroupType.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum ConsumerGroupType {
+    UNKNOWN("unknown"),
+    CONSUMER("consumer"),
+    CLASSIC("classic");
+
+    private final static Map<String, ConsumerGroupType> NAME_TO_ENUM = 
Arrays.stream(values())
+        .collect(Collectors.toMap(type -> type.name, Function.identity()));
+
+    private final String name;
+
+    ConsumerGroupType(String name) {
+        this.name = name;
+    }
+
+    /**
+     * Parse a string into a consumer group type.
+     */
+    public static ConsumerGroupType parse(String name) {

Review Comment:
   Should we make this case insensitive?



##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -187,16 +197,69 @@ object ConsumerGroupCommand extends Logging {
     }
 
     def listGroups(): Unit = {
-      if (opts.options.has(opts.stateOpt)) {
-        val stateValue = opts.options.valueOf(opts.stateOpt)
-        val states = if (stateValue == null || stateValue.isEmpty)
-          Set[ConsumerGroupState]()
-        else
-          consumerGroupStatesFromString(stateValue)
-        val listings = listConsumerGroupsWithState(states)
-        printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
-      } else
+      val includeState = opts.options.has(opts.stateOpt)
+      val includeType = opts.options.has(opts.typeOpt)
+
+      val groupInfoMap = mutable.Map[String, (String, String)]()
+
+      if (includeType || includeState) {
+        val states = getStateValues()
+        val types = getTypeValues()
+        val listings = {
+          listConsumerGroupsWithFilters(states, types)
+        }
+
+        listings.foreach { listing =>
+          val groupId = listing.groupId
+          val groupType = 
listing.groupType().orElse(ConsumerGroupType.UNKNOWN).toString
+          val state = 
listing.state().orElse(ConsumerGroupState.UNKNOWN).toString
+          groupInfoMap.update(groupId, (state, groupType))
+        }
+
+        val groupInfoList = groupInfoMap.toList.map { case (groupId, (state, 
groupType)) => (groupId, state, groupType) }
+        printGroupInfo(groupInfoList, includeState, includeType)
+
+      } else {
         listConsumerGroups().foreach(println(_))
+      }
+    }
+
+    private def getStateValues(): Set[ConsumerGroupState] = {
+      val stateValue = opts.options.valueOf(opts.stateOpt)
+      if (stateValue == null || stateValue.isEmpty)
+        Set[ConsumerGroupState]()
+      else
+        consumerGroupStatesFromString(stateValue)
+    }
+
+    private def getTypeValues(): Set[ConsumerGroupType] = {
+      val typeValue = opts.options.valueOf(opts.typeOpt)
+      if (typeValue == null || typeValue.isEmpty)
+        Set[ConsumerGroupType]()
+      else
+        consumerGroupTypesFromString(typeValue)
+    }
+
+    private def printGroupInfo(groupsAndInfo: List[(String, String, String)], 
includeState: Boolean, includeType: Boolean): Unit = {
+      val maxGroupLen: Int = groupsAndInfo.foldLeft(15)((maxLen, group) => 
Math.max(maxLen, group._1.length))
+      var header = "GROUP"
+      var format = s"%-${maxGroupLen}s"
+
+      if (includeState) {
+        header += " STATE"
+        format += " %-20s"
+      }
+      if (includeType) {
+        header += " TYPE"
+        format += " %-20s"
+      }

Review Comment:
   nit: I would print the type before the state.



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##########
@@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends 
AbstractOptions<ListConsumerGroup
 
     private Set<ConsumerGroupState> states = Collections.emptySet();
 
+    private Set<ConsumerGroupType> groupTypes = Collections.emptySet();
+
     /**
-     * If states is set, only groups in these states will be returned by 
listConsumerGroups()
+     * If states is set, only groups in these states will be returned by 
listConsumerGroups().
      * Otherwise, all groups are returned.
      * This operation is supported by brokers with version 2.6.0 or later.
      */
     public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
-        this.states = (states == null) ? Collections.emptySet() : new 
HashSet<>(states);
+        this.states = (states == null || states.isEmpty()) ? 
Collections.emptySet() : states;

Review Comment:
   Why are we changing this? Making a copy of `states` seems to be the right 
thing to do here.



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##########
@@ -50,4 +62,11 @@ public ListConsumerGroupsOptions 
inStates(Set<ConsumerGroupState> states) {
     public Set<ConsumerGroupState> states() {
         return states;
     }
+
+    /**
+     * Returns the list of types that are requested or empty if no groupTypes 
have been specified
+     */
+    public Set<ConsumerGroupType> groupTypes() {

Review Comment:
   nit: `types`?



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -46,16 +46,16 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     }, s"Expected --list to show groups $expectedGroups, but found 
$foundGroups.")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListWithUnrecognizedNewConsumerOption(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: 
String): Unit = {

Review Comment:
   Does it bring any value to test all the combinations in this case?



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +64,89 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     val expectedListing = Set(
-      new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+      new ConsumerGroupListing(simpleGroup, true)
+        .setState(Optional.of(ConsumerGroupState.EMPTY))
+        .setType(if (quorum.contains("kip848")) 
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()),
+      new ConsumerGroupListing(group, false)
+        .setState(Optional.of(ConsumerGroupState.STABLE))
+        .setType(if (quorum.contains("kip848")) 
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty())
+    )
 
     var foundListing = Set.empty[ConsumerGroupListing]
     TestUtils.waitUntilTrue(() => {
-      foundListing = 
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+      foundListing = 
service.listConsumerGroupsWithFilters(ConsumerGroupState.values.toSet, 
Set.empty).toSet
       expectedListing == foundListing
     }, s"Expected to show groups $expectedListing, but found $foundListing")
 
-    val expectedListingStable = Set(
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+    val expectedListingStable = Set.empty[ConsumerGroupListing]
 
     foundListing = Set.empty[ConsumerGroupListing]
     TestUtils.waitUntilTrue(() => {
-      foundListing = 
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+      foundListing = 
service.listConsumerGroupsWithFilters(Set(ConsumerGroupState.PREPARING_REBALANCE),
 Set.empty).toSet
       expectedListingStable == foundListing
     }, s"Expected to show groups $expectedListingStable, but found 
$foundListing")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testConsumerGroupStatesFromString(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithTypes(quorum: String, groupProtocol: String): 
Unit = {
+    val simpleGroup = "simple-group"
+    addSimpleGroupExecutor(group = simpleGroup)
+    addConsumerGroupExecutor(numConsumers = 1)
+
+    val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type")
+    val service = getConsumerGroupService(cgcArgs)
+
+    val expectedListingStable = Set.empty[ConsumerGroupListing]
+
+    val expectedListing = Set(
+      new ConsumerGroupListing(simpleGroup, true)
+        .setState(Optional.of(ConsumerGroupState.EMPTY))
+        .setType(if(quorum.contains("kip848")) 
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()),

Review Comment:
   Why are we doing this? The type should work in all combinations, no?



##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -187,16 +197,69 @@ object ConsumerGroupCommand extends Logging {
     }
 
     def listGroups(): Unit = {
-      if (opts.options.has(opts.stateOpt)) {
-        val stateValue = opts.options.valueOf(opts.stateOpt)
-        val states = if (stateValue == null || stateValue.isEmpty)
-          Set[ConsumerGroupState]()
-        else
-          consumerGroupStatesFromString(stateValue)
-        val listings = listConsumerGroupsWithState(states)
-        printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
-      } else
+      val includeState = opts.options.has(opts.stateOpt)
+      val includeType = opts.options.has(opts.typeOpt)
+
+      val groupInfoMap = mutable.Map[String, (String, String)]()
+
+      if (includeType || includeState) {
+        val states = getStateValues()
+        val types = getTypeValues()
+        val listings = {
+          listConsumerGroupsWithFilters(states, types)
+        }
+
+        listings.foreach { listing =>
+          val groupId = listing.groupId
+          val groupType = 
listing.groupType().orElse(ConsumerGroupType.UNKNOWN).toString
+          val state = 
listing.state().orElse(ConsumerGroupState.UNKNOWN).toString
+          groupInfoMap.update(groupId, (state, groupType))
+        }
+
+        val groupInfoList = groupInfoMap.toList.map { case (groupId, (state, 
groupType)) => (groupId, state, groupType) }

Review Comment:
   I wonder if this conversion is really necessary. Passing the Map to 
`printGroupInfo` seems reasonable too.



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +64,89 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     val expectedListing = Set(
-      new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+      new ConsumerGroupListing(simpleGroup, true)
+        .setState(Optional.of(ConsumerGroupState.EMPTY))
+        .setType(if (quorum.contains("kip848")) 
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()),
+      new ConsumerGroupListing(group, false)
+        .setState(Optional.of(ConsumerGroupState.STABLE))
+        .setType(if (quorum.contains("kip848")) 
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty())
+    )
 
     var foundListing = Set.empty[ConsumerGroupListing]
     TestUtils.waitUntilTrue(() => {
-      foundListing = 
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+      foundListing = 
service.listConsumerGroupsWithFilters(ConsumerGroupState.values.toSet, 
Set.empty).toSet
       expectedListing == foundListing
     }, s"Expected to show groups $expectedListing, but found $foundListing")
 
-    val expectedListingStable = Set(
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+    val expectedListingStable = Set.empty[ConsumerGroupListing]
 
     foundListing = Set.empty[ConsumerGroupListing]
     TestUtils.waitUntilTrue(() => {
-      foundListing = 
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+      foundListing = 
service.listConsumerGroupsWithFilters(Set(ConsumerGroupState.PREPARING_REBALANCE),
 Set.empty).toSet
       expectedListingStable == foundListing
     }, s"Expected to show groups $expectedListingStable, but found 
$foundListing")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testConsumerGroupStatesFromString(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithTypes(quorum: String, groupProtocol: String): 
Unit = {
+    val simpleGroup = "simple-group"
+    addSimpleGroupExecutor(group = simpleGroup)
+    addConsumerGroupExecutor(numConsumers = 1)
+
+    val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type")
+    val service = getConsumerGroupService(cgcArgs)
+
+    val expectedListingStable = Set.empty[ConsumerGroupListing]
+
+    val expectedListing = Set(
+      new ConsumerGroupListing(simpleGroup, true)
+        .setState(Optional.of(ConsumerGroupState.EMPTY))
+        .setType(if(quorum.contains("kip848")) 
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()),
+      new ConsumerGroupListing(group, false)
+        .setState(Optional.of(ConsumerGroupState.STABLE))
+        .setType(if(quorum.contains("kip848")) 
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty())
+    )
+
+    var foundListing = Set.empty[ConsumerGroupListing]
+    TestUtils.waitUntilTrue(() => {
+      foundListing = service.listConsumerGroupsWithFilters(Set.empty, 
Set.empty).toSet
+      expectedListing == foundListing
+    }, s"Expected to show groups $expectedListing, but found $foundListing")
+
+    // When group type is mentioned:

Review Comment:
   This test is a bit confusing. I wonder if it would be better to split it in 
multiple tests. For instance, we could have one testing the old coordinator in 
ZK or KRaft mode. Then on testing the new coordinator in KRaft mode. What do 
you think?



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -18,19 +18,19 @@ package kafka.admin
 
 import joptsimple.OptionException
 import org.junit.jupiter.api.Assertions._
-import kafka.utils.TestUtils
-import org.apache.kafka.common.ConsumerGroupState
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.ConsumerGroupListing
-import java.util.Optional
-
+import org.apache.kafka.common.{ConsumerGroupState, ConsumerGroupType}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.MethodSource
+
+import java.util.Optional
 
 class ListConsumerGroupTest extends ConsumerGroupCommandTest {
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroups(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: 
String): Unit = {

Review Comment:
   Does `groupProtocol` have any effect on the consumer created in this test?



##########
core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala:
##########
@@ -116,6 +123,18 @@ class ConsumerGroupCommandTest extends 
KafkaServerTestHarness {
 }
 
 object ConsumerGroupCommandTest {
+  // We want to test the following combinations:
+  // * ZooKeeper and the classic group protocol.
+  // * KRaft and the classic group protocol.
+  // * KRaft with the new group coordinator enabled and the classic group 
protocol.
+  // * KRaft with the new group coordinator enabled and the consumer group 
protocol.
+  def getTestQuorumAndGroupProtocolParametersAll: 
java.util.stream.Stream[Arguments] = {

Review Comment:
   We already have the same one defined in BaseConsumerTest. I would if we 
could reuse it. Could we?



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -46,16 +46,16 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     }, s"Expected --list to show groups $expectedGroups, but found 
$foundGroups.")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListWithUnrecognizedNewConsumerOption(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: 
String): Unit = {
     val cgcArgs = Array("--new-consumer", "--bootstrap-server", 
bootstrapServers(), "--list")
     assertThrows(classOf[OptionException], () => 
getConsumerGroupService(cgcArgs))
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroupsWithStates(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithStates(quorum: String, groupProtocol: String): 
Unit = {

Review Comment:
   Same question. Is it necessary? Or would using `@ValueSource(strings = 
Array("zk", "kraft", "kraft+kip-848"))` be enough?



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -104,9 +165,27 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupStatesFromString("   ,   ,"))
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListGroupCommand(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumerGroupTypesFromString(quorum: String, groupProtocol: String): 
Unit = {
+    var result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer")
+    assertEquals(Set(ConsumerGroupType.CONSUMER), result)
+
+    result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer, 
classic")
+    assertEquals(Set(ConsumerGroupType.CONSUMER, ConsumerGroupType.CLASSIC), 
result)
+
+    assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong"))
+
+    assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("Consumer"))

Review Comment:
   hum... I expect this to actually work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to