lucasbru commented on code in PR #17981:
URL: https://github.com/apache/kafka/pull/17981#discussion_r1867399836


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupStore.java:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.modern.ModernGroup;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
+import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE;
+
+public class GroupStore {
+
+    /**
+     * The snapshot registry.
+     */
+    private SnapshotRegistry snapshotRegistry;

Review Comment:
   Could be final



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupStore.java:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.modern.ModernGroup;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
+import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE;
+
+public class GroupStore {
+
+    /**
+     * The snapshot registry.

Review Comment:
   I personally would avoid javadoc comments that just repeat the field name. 
Same below.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupStore.java:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.modern.ModernGroup;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
+import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE;
+
+public class GroupStore {
+
+    /**
+     * The snapshot registry.
+     */
+    private SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The classic and consumer groups keyed by their name.
+     */
+    private final TimelineHashMap<String, Group> groups;
+
+    /**
+     * The group ids keyed by topic names.
+     */
+    private final TimelineHashMap<String, TimelineHashSet<String>> 
groupsByTopics;
+
+    /**
+     * The metadata image.
+     */
+    private MetadataImage metadataImage;
+
+    public GroupStore(SnapshotRegistry snapshotRegistry, MetadataImage 
metadataImage) {
+        this.snapshotRegistry = snapshotRegistry;
+        this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.metadataImage = metadataImage;
+    }
+
+    /**
+     * @return The current metadata image used by the group metadata manager.
+     */
+    public MetadataImage image() {
+        return metadataImage;
+    }
+
+    /**
+     * Returns the snapshot registry.
+     *
+     * @return The snapshot registry.
+     */
+    public SnapshotRegistry snapshotRegistry() {
+        return snapshotRegistry;
+    }
+
+    /**
+     * Get the Group List.
+     *
+     * @param statesFilter      The states of the groups we want to list.
+     *                          If empty, all groups are returned with their 
state.
+     *                          If invalid, no groups are returned.
+     * @param typesFilter       The types of the groups we want to list.
+     *                          If empty, all groups are returned with their 
type.
+     *                          If invalid, no groups are returned.
+     * @param committedOffset   A specified committed offset corresponding to 
this shard.
+     *
+     * @return A list containing the ListGroupsResponseData.ListedGroup
+     */
+    public List<ListGroupsResponseData.ListedGroup> listGroups(
+        Set<String> statesFilter,
+        Set<String> typesFilter,
+        long committedOffset
+    ) {
+        // Converts each state filter string to lower case for a 
case-insensitive comparison.
+        Set<String> caseInsensitiveFilterSet = statesFilter.stream()
+            .map(String::toLowerCase)
+            .map(String::trim)
+            .collect(Collectors.toSet());
+
+        // Converts each type filter string to a value in the GroupType enum 
while being case-insensitive.
+        Set<Group.GroupType> enumTypesFilter = typesFilter.stream()
+            .map(Group.GroupType::parse)
+            .collect(Collectors.toSet());
+
+        Predicate<Group> combinedFilter = group -> {
+            boolean stateCheck = statesFilter.isEmpty() || 
group.isInStates(caseInsensitiveFilterSet, committedOffset);
+            boolean typeCheck = enumTypesFilter.isEmpty() || 
enumTypesFilter.contains(group.type());
+
+            return stateCheck && typeCheck;
+        };
+
+        Stream<Group> groupStream = groups.values(committedOffset).stream();
+
+        return groupStream
+            .filter(combinedFilter)
+            .map(group -> group.asListedGroup(committedOffset))
+            .collect(Collectors.toList());
+    }
+
+    /**
+     * @return The group corresponding to the group id or throw 
GroupIdNotFoundException.
+     */
+    public Group group(String groupId) throws GroupIdNotFoundException {
+        Group group = groups.get(groupId, Long.MAX_VALUE);
+        if (group == null) {
+            throw new GroupIdNotFoundException(String.format("Group %s not 
found.", groupId));
+        }
+        return group;
+    }
+
+    /**
+     * @return The group corresponding to the group id at the given committed 
offset
+     *         or throw GroupIdNotFoundException.
+     */
+    public Group group(String groupId, long committedOffset) throws 
GroupIdNotFoundException {
+        Group group = groups.get(groupId, committedOffset);
+        if (group == null) {
+            throw new GroupIdNotFoundException(String.format("Group %s not 
found.", groupId));
+        }
+        return group;
+    }
+
+    /**
+     * @return The set of groups subscribed to the topic.
+     */
+    public Set<String> groupsSubscribedToTopic(String topicName) {
+        Set<String> groups = groupsByTopics.get(topicName);
+        return groups != null ? groups : Collections.emptySet();
+    }
+
+    /**
+     * A new metadata image is available.
+     *
+     * @param newImage  The new metadata image.
+     * @param delta     The delta image.
+     */
+    public void onNewMetadataImage(MetadataImage newImage, MetadataDelta 
delta) {
+        metadataImage = newImage;
+
+        // Notify all the groups subscribed to the created, updated or
+        // deleted topics.
+        Optional.ofNullable(delta.topicsDelta()).ifPresent(topicsDelta -> {
+            Set<String> allGroupIds = new HashSet<>();
+            topicsDelta.changedTopics().forEach((topicId, topicDelta) -> {
+                String topicName = topicDelta.name();
+                allGroupIds.addAll(groupsSubscribedToTopic(topicName));
+            });
+            topicsDelta.deletedTopicIds().forEach(topicId -> {
+                TopicImage topicImage = 
delta.image().topics().getTopic(topicId);
+                allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name()));
+            });
+            allGroupIds.forEach(groupId -> {
+                Group group = groups.get(groupId);
+                if (group != null && (group.type() == CONSUMER || group.type() 
== SHARE)) {
+                    ((ModernGroup<?>) group).requestMetadataRefresh();

Review Comment:
   I don't really like this. We should just do proper object orientation here, 
define `oneMetadataUpdate` or something like that to `Group` and call it here 
on any group type, and leave it empty for classic groups.
   
   But I see that this is not your code, but just copied. So, we could fix it 
in a later PR.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupStore.java:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.modern.ModernGroup;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
+import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE;
+
+public class GroupStore {
+
+    /**
+     * The snapshot registry.
+     */
+    private SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The classic and consumer groups keyed by their name.
+     */
+    private final TimelineHashMap<String, Group> groups;
+
+    /**
+     * The group ids keyed by topic names.
+     */
+    private final TimelineHashMap<String, TimelineHashSet<String>> 
groupsByTopics;
+
+    /**
+     * The metadata image.
+     */
+    private MetadataImage metadataImage;
+
+    public GroupStore(SnapshotRegistry snapshotRegistry, MetadataImage 
metadataImage) {
+        this.snapshotRegistry = snapshotRegistry;
+        this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.metadataImage = metadataImage;
+    }
+
+    /**
+     * @return The current metadata image used by the group metadata manager.
+     */
+    public MetadataImage image() {
+        return metadataImage;
+    }
+
+    /**
+     * Returns the snapshot registry.
+     *
+     * @return The snapshot registry.
+     */
+    public SnapshotRegistry snapshotRegistry() {
+        return snapshotRegistry;
+    }
+
+    /**
+     * Get the Group List.
+     *
+     * @param statesFilter      The states of the groups we want to list.
+     *                          If empty, all groups are returned with their 
state.
+     *                          If invalid, no groups are returned.
+     * @param typesFilter       The types of the groups we want to list.
+     *                          If empty, all groups are returned with their 
type.
+     *                          If invalid, no groups are returned.
+     * @param committedOffset   A specified committed offset corresponding to 
this shard.
+     *
+     * @return A list containing the ListGroupsResponseData.ListedGroup
+     */
+    public List<ListGroupsResponseData.ListedGroup> listGroups(
+        Set<String> statesFilter,
+        Set<String> typesFilter,
+        long committedOffset
+    ) {
+        // Converts each state filter string to lower case for a 
case-insensitive comparison.
+        Set<String> caseInsensitiveFilterSet = statesFilter.stream()
+            .map(String::toLowerCase)
+            .map(String::trim)
+            .collect(Collectors.toSet());
+
+        // Converts each type filter string to a value in the GroupType enum 
while being case-insensitive.
+        Set<Group.GroupType> enumTypesFilter = typesFilter.stream()
+            .map(Group.GroupType::parse)
+            .collect(Collectors.toSet());
+
+        Predicate<Group> combinedFilter = group -> {
+            boolean stateCheck = statesFilter.isEmpty() || 
group.isInStates(caseInsensitiveFilterSet, committedOffset);
+            boolean typeCheck = enumTypesFilter.isEmpty() || 
enumTypesFilter.contains(group.type());
+
+            return stateCheck && typeCheck;
+        };
+
+        Stream<Group> groupStream = groups.values(committedOffset).stream();
+
+        return groupStream
+            .filter(combinedFilter)
+            .map(group -> group.asListedGroup(committedOffset))
+            .collect(Collectors.toList());
+    }
+
+    /**
+     * @return The group corresponding to the group id or throw 
GroupIdNotFoundException.
+     */
+    public Group group(String groupId) throws GroupIdNotFoundException {
+        Group group = groups.get(groupId, Long.MAX_VALUE);
+        if (group == null) {
+            throw new GroupIdNotFoundException(String.format("Group %s not 
found.", groupId));
+        }
+        return group;
+    }
+
+    /**
+     * @return The group corresponding to the group id at the given committed 
offset
+     *         or throw GroupIdNotFoundException.
+     */
+    public Group group(String groupId, long committedOffset) throws 
GroupIdNotFoundException {
+        Group group = groups.get(groupId, committedOffset);
+        if (group == null) {
+            throw new GroupIdNotFoundException(String.format("Group %s not 
found.", groupId));
+        }
+        return group;
+    }
+
+    /**
+     * @return The set of groups subscribed to the topic.
+     */
+    public Set<String> groupsSubscribedToTopic(String topicName) {
+        Set<String> groups = groupsByTopics.get(topicName);
+        return groups != null ? groups : Collections.emptySet();
+    }
+
+    /**
+     * A new metadata image is available.
+     *
+     * @param newImage  The new metadata image.
+     * @param delta     The delta image.
+     */
+    public void onNewMetadataImage(MetadataImage newImage, MetadataDelta 
delta) {
+        metadataImage = newImage;
+
+        // Notify all the groups subscribed to the created, updated or
+        // deleted topics.
+        Optional.ofNullable(delta.topicsDelta()).ifPresent(topicsDelta -> {
+            Set<String> allGroupIds = new HashSet<>();
+            topicsDelta.changedTopics().forEach((topicId, topicDelta) -> {
+                String topicName = topicDelta.name();
+                allGroupIds.addAll(groupsSubscribedToTopic(topicName));
+            });
+            topicsDelta.deletedTopicIds().forEach(topicId -> {
+                TopicImage topicImage = 
delta.image().topics().getTopic(topicId);
+                allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name()));
+            });
+            allGroupIds.forEach(groupId -> {
+                Group group = groups.get(groupId);
+                if (group != null && (group.type() == CONSUMER || group.type() 
== SHARE)) {
+                    ((ModernGroup<?>) group).requestMetadataRefresh();
+                }
+            });
+        });
+    }
+
+    /**
+     * Delete the group if it exists and is in Empty state.
+     *
+     * @param groupId The group id.
+     * @param records The list of records to append the group metadata 
tombstone records.
+     */
+    public void maybeDeleteGroup(String groupId, List<CoordinatorRecord> 
records) {
+        Group group = groups.get(groupId);
+        if (group != null && group.isEmpty()) {
+            createGroupTombstoneRecords(groupId, records);
+        }
+    }
+
+    /**
+     * Handles a DeleteGroups request.
+     * Populates the record list passed in with record to update the state 
machine.
+     * Validations are done in {@link 
GroupCoordinatorShard#deleteGroups(RequestContext, List)} by
+     * calling {@link GroupMetadataManager#validateDeleteGroup(String)}.
+     *
+     * @param groupId The id of the group to be deleted. It has been checked 
in {@link GroupMetadataManager#validateDeleteGroup}.
+     * @param records The record list to populate.
+     */
+    public void createGroupTombstoneRecords(
+        String groupId,
+        List<CoordinatorRecord> records
+    ) {
+        // At this point, we have already validated the group id, so we know 
that the group exists and that no exception will be thrown.
+        createGroupTombstoneRecords(group(groupId), records);
+    }
+
+    /**
+     * Populates the record list passed in with record to update the state 
machine.
+     *
+     * @param group The group to be deleted.
+     * @param records The record list to populate.
+     */
+    public void createGroupTombstoneRecords(
+        Group group,
+        List<CoordinatorRecord> records
+    ) {
+        group.createGroupTombstoneRecords(records);
+    }
+
+    /**
+     * @return The set of all groups' ids.
+     */
+    public Set<String> groupIds() {
+        return Collections.unmodifiableSet(this.groups.keySet());
+    }
+
+    /**
+     * Adds a new group to the group store.
+     *
+     * @param groupId The groupId of the group to be added.
+     * @param group The group to be added.
+     */
+    public void addGroup(String groupId, Group group) {
+        groups.put(groupId, group);
+    }
+
+    /**
+     * Subscribes a group to a topic.
+     *
+     * @param groupId   The group id.
+     * @param topicName The topic name.
+     */
+    public void subscribeGroupToTopic(
+        String groupId,
+        String topicName
+    ) {
+        groupsByTopics
+            .computeIfAbsent(topicName, __ -> new 
TimelineHashSet<>(snapshotRegistry, 1))
+            .add(groupId);
+    }
+
+    /**
+     * Unsubscribes a group from a topic.
+     *
+     * @param groupId   The group id.
+     * @param topicName The topic name.
+     */
+    public void unsubscribeGroupFromTopic(
+        String groupId,
+        String topicName
+    ) {
+        groupsByTopics.computeIfPresent(topicName, (__, groupIds) -> {
+            groupIds.remove(groupId);
+            return groupIds.isEmpty() ? null : groupIds;
+        });
+    }
+
+    /**
+     * Removes the group.
+     *
+     * @param groupId The group id.
+     *
+     * @return The type of the removed group.
+     */
+    public Group removeGroup(
+        String groupId
+    ) {
+        return groups.remove(groupId);
+    }
+
+    /**
+     * Validates the DeleteGroups request.
+     *
+     * @param groupId The id of the group to be deleted.
+     */
+    void validateDeleteGroup(String groupId) throws ApiException {

Review Comment:
   We could consider inlining this code.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupStore.java:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.modern.ModernGroup;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
+import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE;
+
+public class GroupStore {
+
+    /**
+     * The snapshot registry.
+     */
+    private SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The classic and consumer groups keyed by their name.

Review Comment:
   Aren't share groups part of this collection as well?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupStore.java:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.modern.ModernGroup;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
+import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE;
+
+public class GroupStore {

Review Comment:
   Yes, I like the proposal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to