lucasbru commented on code in PR #18809: URL: https://github.com/apache/kafka/pull/18809#discussion_r1952919339
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -15135,6 +15284,229 @@ public void testReplayConsumerGroupCurrentMemberAssignmentTombstone() { assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.consumerGroup("bar")); } + @Test + public void testReplayStreamsGroupMemberMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setClientId("clientid") + .setClientHost("clienthost") + .setRackId("rackid") + .setInstanceId("instanceid") + .setRebalanceTimeoutMs(1000) + .setTopologyEpoch(10) + .setProcessId("processid") + .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999)) + .setClientTags(Collections.singletonMap("key", "value")) + .build(); + + // The group and the member are created if they do not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo", member)); + assertEquals(member, context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member", false)); + } + + @Test + public void testReplayStreamsGroupMemberMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group still exists but the member is already gone. Replaying the + // StreamsGroupMemberMetadata tombstone should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1")); + assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", false)); + + // The group may not exist at all. Replaying the StreamsGroupMemberMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar", "m1")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("bar")); + } + + @Test + public void testReplayStreamsGroupMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); + assertEquals(10, context.groupMetadataManager.streamsGroup("foo").groupEpoch()); + } + + @Test + public void testReplayStreamsGroupMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupPartitionMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> metadata = Map.of( + "bar", + new org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), "bar", 10) + ); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo", metadata)); + assertEquals(metadata, context.groupMetadataManager.streamsGroup("foo").partitionMetadata()); + } + + @Test + public void testReplayStreamsGroupPartitionMetadataTombstone() { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org