dajac commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1470785323


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##########
@@ -1106,17 +1106,20 @@ private[group] class GroupCoordinator(
     }
   }
 
-  def handleListGroups(states: Set[String]): (Errors, List[GroupOverview]) = {
+  def handleListGroups(states: Set[String], groupTypes: Set[String]): (Errors, 
List[GroupOverview]) = {
     if (!isActive.get) {
       (Errors.COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
     } else {
       val errorCode = if (groupManager.isLoading) 
Errors.COORDINATOR_LOAD_IN_PROGRESS else Errors.NONE
-      // if states is empty, return all groups
-      val groups = if (states.isEmpty)
-        groupManager.currentGroups
-      else {
-        val caseInsensitiveStates = states.map(_.toLowerCase)
-        groupManager.currentGroups.filter(g => 
g.isInStates(caseInsensitiveStates))
+
+      // Filter groups based on states and groupTypes. If either is empty, it 
won't filter on that criterion.
+      // While using the old group coordinator, all groups are considered 
classic groups by default.
+      // An empty list is returned for any other type filter.
+      val enumTypesFilter: Set[Group.GroupType] = 
groupTypes.map(Group.GroupType.parse)
+
+      val groups = groupManager.currentGroups.filter { g =>
+        states.isEmpty || g.isInStates(states.map(_.toLowerCase)) &&

Review Comment:
   nit: Could we bring back `caseInsensitiveStates` in order to not lower case 
the states for every group?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to