dajac commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1481015399


##########
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##########
@@ -37,7 +39,10 @@ public class ConsumerGroupListing {
      * @param isSimpleConsumerGroup If consumer group is simple or not.
      */
     public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup) 
{
-        this(groupId, isSimpleConsumerGroup, Optional.empty());
+        this.groupId = groupId;
+        this.isSimpleConsumerGroup = isSimpleConsumerGroup;
+        this.state = Optional.empty();
+        this.groupType = Optional.empty();

Review Comment:
   nit: Calling the other constructor as it was before is actually slightly 
better.



##########
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##########
@@ -21,14 +21,16 @@
 import java.util.Optional;
 
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
 
 /**
  * A listing of a consumer group in the cluster.
  */
 public class ConsumerGroupListing {
     private final String groupId;
     private final boolean isSimpleConsumerGroup;
-    private final Optional<ConsumerGroupState> state;
+    private Optional<ConsumerGroupState> state;
+    private Optional<GroupType> groupType;

Review Comment:
   It is better to keep them final as this object is supposed to be immutable.



##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -189,16 +199,68 @@ object ConsumerGroupCommand extends Logging {
     }
 
     def listGroups(): Unit = {
-      if (opts.options.has(opts.stateOpt)) {
-        val stateValue = opts.options.valueOf(opts.stateOpt)
-        val states = if (stateValue == null || stateValue.isEmpty)
-          Set[ConsumerGroupState]()
-        else
-          consumerGroupStatesFromString(stateValue)
-        val listings = listConsumerGroupsWithState(states)
-        printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
-      } else
+      val includeType = opts.options.has(opts.typeOpt)
+      val includeState = opts.options.has(opts.stateOpt)
+
+      val groupInfoMap = mutable.Map[String, (String, String)]()
+
+      if (includeType || includeState) {
+        val types = getTypeValues()
+        val states = getStateValues()
+        val listings = {
+          listConsumerGroupsWithFilters(types, states)
+        }
+
+        listings.foreach { listing =>
+          val groupId = listing.groupId
+          val groupType = 
listing.groupType().orElse(GroupType.UNKNOWN).toString
+          val state = 
listing.state().orElse(ConsumerGroupState.UNKNOWN).toString
+          groupInfoMap.update(groupId, (groupType, state))

Review Comment:
   I wonder if converting the listing to a map is really necessary. I 
understand that it was there before. However, it seems that we could just pass 
the listing to `printGroupInfo` and avoid it. What do you think?



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2804,8 +2804,8 @@ public void testListConsumerGroupsWithStates() throws 
Exception {
 

Review Comment:
   It would be great if we could add more unit tests here:
   * Could we test the cases where the version of the request supported does 
not support the type filter? It should throw an unsupported exception: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java#L53.
   * We need to test the basic plumbing.
   * We need to test the different variations of `ListConsumerGroupsOptions`.



##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -187,16 +197,69 @@ object ConsumerGroupCommand extends Logging {
     }
 
     def listGroups(): Unit = {
-      if (opts.options.has(opts.stateOpt)) {
-        val stateValue = opts.options.valueOf(opts.stateOpt)
-        val states = if (stateValue == null || stateValue.isEmpty)
-          Set[ConsumerGroupState]()
-        else
-          consumerGroupStatesFromString(stateValue)
-        val listings = listConsumerGroupsWithState(states)
-        printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
-      } else
+      val includeState = opts.options.has(opts.stateOpt)
+      val includeType = opts.options.has(opts.typeOpt)
+
+      val groupInfoMap = mutable.Map[String, (String, String)]()
+
+      if (includeType || includeState) {
+        val states = getStateValues()
+        val types = getTypeValues()
+        val listings = {
+          listConsumerGroupsWithFilters(states, types)
+        }
+
+        listings.foreach { listing =>
+          val groupId = listing.groupId
+          val groupType = 
listing.groupType().orElse(ConsumerGroupType.UNKNOWN).toString
+          val state = 
listing.state().orElse(ConsumerGroupState.UNKNOWN).toString
+          groupInfoMap.update(groupId, (state, groupType))
+        }
+
+        val groupInfoList = groupInfoMap.toList.map { case (groupId, (state, 
groupType)) => (groupId, state, groupType) }
+        printGroupInfo(groupInfoList, includeState, includeType)
+
+      } else {
         listConsumerGroups().foreach(println(_))
+      }
+    }
+
+    private def getStateValues(): Set[ConsumerGroupState] = {
+      val stateValue = opts.options.valueOf(opts.stateOpt)
+      if (stateValue == null || stateValue.isEmpty)
+        Set[ConsumerGroupState]()
+      else
+        consumerGroupStatesFromString(stateValue)
+    }
+
+    private def getTypeValues(): Set[ConsumerGroupType] = {
+      val typeValue = opts.options.valueOf(opts.typeOpt)
+      if (typeValue == null || typeValue.isEmpty)
+        Set[ConsumerGroupType]()
+      else
+        consumerGroupTypesFromString(typeValue)
+    }
+
+    private def printGroupInfo(groupsAndInfo: List[(String, String, String)], 
includeState: Boolean, includeType: Boolean): Unit = {
+      val maxGroupLen: Int = groupsAndInfo.foldLeft(15)((maxLen, group) => 
Math.max(maxLen, group._1.length))
+      var header = "GROUP"

Review Comment:
   Sorry. I should have explained it a bit more. You construct `header` by 
adding type and/or state if they are needed. Then, you split the `header` into 
a list. It seems that we could avoid this if we directly use a list.



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +67,97 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     val expectedListing = Set(
-      new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+      new ConsumerGroupListing(simpleGroup, true)
+        .setState(Optional.of(ConsumerGroupState.EMPTY))
+        .setType(Optional.of(GroupType.CLASSIC)),
+      new ConsumerGroupListing(group, false)
+        .setState(Optional.of(ConsumerGroupState.STABLE))
+        .setType(Optional.of(GroupType.CLASSIC))
+    )
 
     var foundListing = Set.empty[ConsumerGroupListing]
     TestUtils.waitUntilTrue(() => {
-      foundListing = 
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+      foundListing = service.listConsumerGroupsWithFilters(Set.empty, 
ConsumerGroupState.values.toSet).toSet
       expectedListing == foundListing
     }, s"Expected to show groups $expectedListing, but found $foundListing")
 
-    val expectedListingStable = Set(
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+    val expectedListingStable = Set.empty[ConsumerGroupListing]
 
     foundListing = Set.empty[ConsumerGroupListing]
     TestUtils.waitUntilTrue(() => {
-      foundListing = 
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+      foundListing = service.listConsumerGroupsWithFilters(Set.empty, 
Set(ConsumerGroupState.PREPARING_REBALANCE)).toSet
       expectedListingStable == foundListing
     }, s"Expected to show groups $expectedListingStable, but found 
$foundListing")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testConsumerGroupStatesFromString(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithTypes(quorum: String, groupProtocol: String): 
Unit = {
+    val simpleGroup = "simple-group"
+    val protocolGroup = "protocol-group"
+
+    addSimpleGroupExecutor(group = simpleGroup)
+    addConsumerGroupExecutor(numConsumers = 1)
+    addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
+
+    val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type")
+    val service = getConsumerGroupService(cgcArgs)
+
+    val expectedListingStable = Set.empty[ConsumerGroupListing]
+
+    var expectedListing = Set(
+      new ConsumerGroupListing(simpleGroup, true)
+        .setState(Optional.of(ConsumerGroupState.EMPTY))
+        .setType(Optional.of(GroupType.CLASSIC)),
+      new ConsumerGroupListing(group, false)
+        .setState(Optional.of(ConsumerGroupState.STABLE))
+        .setType(Optional.of(GroupType.CLASSIC))
+    )
+
+    if (groupProtocol.equals("classic")) {
+      expectedListing = expectedListing + new 
ConsumerGroupListing(protocolGroup, false)
+        .setState(Optional.of(ConsumerGroupState.STABLE))
+        .setType(Optional.of(GroupType.CLASSIC))
+    } else {
+      expectedListing = expectedListing + new 
ConsumerGroupListing(protocolGroup, false)
+        .setState(Optional.of(ConsumerGroupState.STABLE))
+        .setType(Optional.of(GroupType.CONSUMER))
+    }

Review Comment:
   nit: Could we do something like this?
   
   ```
   new ConsumerGroupListing(protocolGroup, false)
           .setState(Optional.of(ConsumerGroupState.STABLE))
           .setType(Optional.of(GroupType.parse(groupProtocol))
   ```



##########
core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala:
##########
@@ -44,7 +44,7 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness 
{
   private var consumerGroupService: List[ConsumerGroupService] = List()
   private var consumerGroupExecutors: List[AbstractConsumerGroupExecutor] = 
List()
 
-  // configure the servers and clients
+  // Configure the servers and clients.

Review Comment:
   nit: Let's revert this as it is not related.



##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -189,16 +199,68 @@ object ConsumerGroupCommand extends Logging {
     }
 
     def listGroups(): Unit = {
-      if (opts.options.has(opts.stateOpt)) {
-        val stateValue = opts.options.valueOf(opts.stateOpt)
-        val states = if (stateValue == null || stateValue.isEmpty)
-          Set[ConsumerGroupState]()
-        else
-          consumerGroupStatesFromString(stateValue)
-        val listings = listConsumerGroupsWithState(states)
-        printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
-      } else
+      val includeType = opts.options.has(opts.typeOpt)
+      val includeState = opts.options.has(opts.stateOpt)
+
+      val groupInfoMap = mutable.Map[String, (String, String)]()
+
+      if (includeType || includeState) {
+        val types = getTypeValues()
+        val states = getStateValues()
+        val listings = {
+          listConsumerGroupsWithFilters(types, states)
+        }

Review Comment:
   nit: I suppose that the curly braces are not needed here.



##########
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##########
@@ -68,48 +97,44 @@ public boolean isSimpleConsumerGroup() {
     }
 
     /**
-     * Consumer Group state
+     * Consumer Group state.
      */
     public Optional<ConsumerGroupState> state() {
         return state;
     }
 
+    /**
+     * The type of the consumer group.
+     *
+     * @return An Optional containing the type, if available.
+     */
+    public Optional<GroupType> groupType() {

Review Comment:
   nit: `type()`.



##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -189,16 +199,68 @@ object ConsumerGroupCommand extends Logging {
     }
 
     def listGroups(): Unit = {
-      if (opts.options.has(opts.stateOpt)) {
-        val stateValue = opts.options.valueOf(opts.stateOpt)
-        val states = if (stateValue == null || stateValue.isEmpty)
-          Set[ConsumerGroupState]()
-        else
-          consumerGroupStatesFromString(stateValue)
-        val listings = listConsumerGroupsWithState(states)
-        printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
-      } else
+      val includeType = opts.options.has(opts.typeOpt)
+      val includeState = opts.options.has(opts.stateOpt)
+
+      val groupInfoMap = mutable.Map[String, (String, String)]()
+
+      if (includeType || includeState) {
+        val types = getTypeValues()
+        val states = getStateValues()
+        val listings = {
+          listConsumerGroupsWithFilters(types, states)
+        }
+
+        listings.foreach { listing =>
+          val groupId = listing.groupId
+          val groupType = 
listing.groupType().orElse(GroupType.UNKNOWN).toString
+          val state = 
listing.state().orElse(ConsumerGroupState.UNKNOWN).toString
+          groupInfoMap.update(groupId, (groupType, state))
+        }
+
+        printGroupInfo(groupInfoMap, includeType, includeState)
+
+      } else {
         listConsumerGroups().foreach(println(_))
+      }
+    }
+
+    private def getStateValues(): Set[ConsumerGroupState] = {
+      val stateValue = opts.options.valueOf(opts.stateOpt)
+      if (stateValue == null || stateValue.isEmpty)
+        Set[ConsumerGroupState]()
+      else
+        consumerGroupStatesFromString(stateValue)
+    }
+
+    private def getTypeValues(): Set[GroupType] = {
+      val typeValue = opts.options.valueOf(opts.typeOpt)
+      if (typeValue == null || typeValue.isEmpty)
+        Set[GroupType]()
+      else
+        consumerGroupTypesFromString(typeValue)
+    }
+
+    private def printGroupInfo(groupsAndInfo: Map[String, (String, String)], 
includeType: Boolean, includeState: Boolean): Unit = {
+      val maxGroupLen: Int = groupsAndInfo.keys.foldLeft(15)((maxLen, groupId) 
=> Math.max(maxLen, groupId.length))
+      var header = "GROUP"
+      var format = s"%-${maxGroupLen}s"
+
+      if (includeType) {
+        header += " TYPE"
+        format += " %-20s"
+      }
+      if (includeState) {
+        header += " STATE"
+        format += " %-20s"
+      }
+
+      println(format.format(ArraySeq.unsafeWrapArray(header.split(" ")): _*))
+
+      groupsAndInfo.foreach { case (groupId, (groupType, state)) =>
+        val info = List(groupId) ++ (if (includeType) List(groupType) else 
List()) ++ (if (includeState) List(state) else List())

Review Comment:
   I was wondering we could simplify this a bit. One option would be to do the 
following:
   
   ```
       private def printGroupInfo(groups: List[ConsumerGroupListing], 
includeType: Boolean, includeState: Boolean): Unit = {
         def groupId(groupListing: ConsumerGroupListing): String = 
groupListing.groupId
         def groupType(groupListing: ConsumerGroupListing): String = 
groupListing.groupType.orElse(GroupType.UNKNOWN).toString
         def groupState(groupListing: ConsumerGroupListing): String = 
groupListing.state.orElse(ConsumerGroupState.UNKNOWN).toString
   
         val maxGroupLen = groups.foldLeft(15)((maxLen, groupListing) => 
Math.max(maxLen, groupId(groupListing).length))
         var format = s"%-${maxGroupLen}s"
         var header = List("Group")
         var extractors: List[ConsumerGroupListing => String] = List(groupId)
   
         if (includeType) {
           header += "TYPE"
           extractors += groupType
           format += " %-20s"
         }
   
         if (includeState) {
           header += "STATE"
           extractors += groupState
           format += " %-20s"
         }
   
         println(format.format(header: _*))
   
         groups.foreach { groupListing =>
           val info = extractors.map(extractor => extractor(groupListing))
           println(format.format(info: _*))
         }
       }
   ```
   
   We could also think about making it fully generic in the future (not in this 
PR). For instance, we could have one function which takes a list of T, a list 
of header and a list of extractors. With this, you have everything to print the 
table.



##########
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##########
@@ -68,48 +97,44 @@ public boolean isSimpleConsumerGroup() {
     }
 
     /**
-     * Consumer Group state
+     * Consumer Group state.
      */
     public Optional<ConsumerGroupState> state() {
         return state;
     }
 
+    /**
+     * The type of the consumer group.
+     *
+     * @return An Optional containing the type, if available.
+     */
+    public Optional<GroupType> groupType() {
+        return groupType;
+    }
+
     @Override
     public String toString() {
         return "(" +
             "groupId='" + groupId + '\'' +
             ", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
             ", state=" + state +
+            ", groupType=" + groupType +

Review Comment:
   nit: `type=`.



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -46,16 +46,16 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     }, s"Expected --list to show groups $expectedGroups, but found 
$foundGroups.")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListWithUnrecognizedNewConsumerOption(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: 
String): Unit = {

Review Comment:
   ditto.



##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -189,16 +199,68 @@ object ConsumerGroupCommand extends Logging {
     }
 
     def listGroups(): Unit = {
-      if (opts.options.has(opts.stateOpt)) {
-        val stateValue = opts.options.valueOf(opts.stateOpt)
-        val states = if (stateValue == null || stateValue.isEmpty)
-          Set[ConsumerGroupState]()
-        else
-          consumerGroupStatesFromString(stateValue)
-        val listings = listConsumerGroupsWithState(states)
-        printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
-      } else
+      val includeType = opts.options.has(opts.typeOpt)
+      val includeState = opts.options.has(opts.stateOpt)
+
+      val groupInfoMap = mutable.Map[String, (String, String)]()
+
+      if (includeType || includeState) {
+        val types = getTypeValues()
+        val states = getStateValues()

Review Comment:
   nit: We usually don't prefix methods by `get`.



##########
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##########
@@ -54,7 +59,31 @@ public ConsumerGroupListing(String groupId, boolean 
isSimpleConsumerGroup, Optio
     }
 
     /**
-     * Consumer Group Id
+     * Set the state of the consumer group.
+     *
+     * @param state         The state of the consumer group.
+     * @return This ConsumerGroupListing instance.
+     */
+    public ConsumerGroupListing setState(Optional<ConsumerGroupState> state) {

Review Comment:
   This class is part of our public api therefore we cannot add new methods 
without doing a KIP. In this case, I don't think that there are necessary 
because it is preferable to keep the object immutable. Instead, we can add a 
new constructor which takes the existing params and the type.



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -18,44 +18,47 @@ package kafka.admin
 
 import joptsimple.OptionException
 import org.junit.jupiter.api.Assertions._
-import kafka.utils.TestUtils
-import org.apache.kafka.common.ConsumerGroupState
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.ConsumerGroupListing
-import java.util.Optional
-
+import org.apache.kafka.common.{ConsumerGroupState, GroupType}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.MethodSource
+
+import java.util.Optional
 
 class ListConsumerGroupTest extends ConsumerGroupCommandTest {
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroups(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: 
String): Unit = {
     val simpleGroup = "simple-group"
+    val protocolGroup = "protocol-group"
+
     addSimpleGroupExecutor(group = simpleGroup)
     addConsumerGroupExecutor(numConsumers = 1)
+    addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
 
     val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
     val service = getConsumerGroupService(cgcArgs)
 
-    val expectedGroups = Set(group, simpleGroup)
+    val expectedGroups = Set(protocolGroup, group, simpleGroup)
     var foundGroups = Set.empty[String]
     TestUtils.waitUntilTrue(() => {
       foundGroups = service.listConsumerGroups().toSet
       expectedGroups == foundGroups
     }, s"Expected --list to show groups $expectedGroups, but found 
$foundGroups.")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListWithUnrecognizedNewConsumerOption(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: 
String): Unit = {
     val cgcArgs = Array("--new-consumer", "--bootstrap-server", 
bootstrapServers(), "--list")
     assertThrows(classOf[OptionException], () => 
getConsumerGroupService(cgcArgs))
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroupsWithStates(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithStates(quorum: String, groupProtocol: String): 
Unit = {
     val simpleGroup = "simple-group"
     addSimpleGroupExecutor(group = simpleGroup)
     addConsumerGroupExecutor(numConsumers = 1)

Review Comment:
   Should we add another consumer based on `groupProtocol`?



##########
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##########
@@ -68,48 +97,44 @@ public boolean isSimpleConsumerGroup() {
     }
 
     /**
-     * Consumer Group state
+     * Consumer Group state.
      */
     public Optional<ConsumerGroupState> state() {
         return state;
     }
 
+    /**
+     * The type of the consumer group.
+     *
+     * @return An Optional containing the type, if available.
+     */
+    public Optional<GroupType> groupType() {
+        return groupType;
+    }
+
     @Override
     public String toString() {
         return "(" +
             "groupId='" + groupId + '\'' +
             ", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
             ", state=" + state +
+            ", groupType=" + groupType +
             ')';
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(groupId, isSimpleConsumerGroup, state);
+        return Objects.hash(groupId, isSimpleConsumerGroup(), state, 
groupType);
     }
 
     @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        ConsumerGroupListing other = (ConsumerGroupListing) obj;
-        if (groupId == null) {

Review Comment:
   This check is not there anymore with the new code. I wonder if we have cases 
where `groupId` could be `null`. We could use `Objects.equals` to be safe.



##########
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##########
@@ -21,14 +21,16 @@
 import java.util.Optional;
 
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
 
 /**
  * A listing of a consumer group in the cluster.
  */
 public class ConsumerGroupListing {
     private final String groupId;
     private final boolean isSimpleConsumerGroup;
-    private final Optional<ConsumerGroupState> state;
+    private Optional<ConsumerGroupState> state;
+    private Optional<GroupType> groupType;

Review Comment:
   nit: Let's call it `type` as we called it `type` in 
`ConsumerGroupDescription`.



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -46,16 +46,16 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     }, s"Expected --list to show groups $expectedGroups, but found 
$foundGroups.")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListWithUnrecognizedNewConsumerOption(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: 
String): Unit = {

Review Comment:
   I reiterate my question. The test verifies that `--new-consumer` is not 
accepted with `--list`. This is completely unrelated to zk vs kraft, old 
coordinator vs new coordinator, old consumer vs new consumer so all the 
combinations that we test here are not relevant. Would it make sense to keep it 
as it was before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to