jeffkbkim commented on code in PR #16898:
URL: https://github.com/apache/kafka/pull/16898#discussion_r1724218208
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -848,26 +844,32 @@ ConsumerGroup consumerGroup(
ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(
String groupId,
boolean createIfNotExists
- ) throws GroupIdNotFoundException {
+ ) throws IllegalStateException {
Group group = groups.get(groupId);
if (group == null && !createIfNotExists) {
- throw new IllegalStateException(String.format("Consumer group %s
not found.", groupId));
+ throw new IllegalStateException(String.format("Consumer group %s
not found", groupId));
}
if (group == null) {
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId, metrics);
groups.put(groupId, consumerGroup);
metrics.onConsumerGroupStateTransition(null,
consumerGroup.state());
return consumerGroup;
+ } else if (group.type() == CONSUMER) {
+ return (ConsumerGroup) group;
+ } else if (group.type() == CLASSIC && ((ClassicGroup)
group).isSimpleGroup()) {
+ // If the group is a simple classic group, it was automatically
created to hold committed
+ // offsets if no group existed. Simple classic groups are not
backed by any records
+ // in the __consumer_offsets topic hence we can safely replace it
here. Without this,
+ // replaying consumer group records after offset commit records
would not work.
Review Comment:
could you provide an example when we would GMM#replay consumer group records
for a simple group? i don't think i understand how
> GMM#replay for consumer group records
can/will be triggered
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]