izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2578119016
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,263 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testConsumerGroupAssignmentResolvesWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long topicHash = computeTopicHash(topicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMetadataHash(topicHash))
+ .build();
+
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+
+ // Assign partition 0 to member A
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Assign partition 0 to member B. This is allowed even though
partition 0 is already owned by member A.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Now assign partition 1 to member A.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
+
+ // Verify partition epochs.
+ assertEquals(group.currentPartitionEpoch(topicId, 0), 12);
+ assertEquals(group.currentPartitionEpoch(topicId, 1), 13);
+ }
+
+ @Test
+ public void testConsumerGroupUnassignmentResolvesWithCompaction() {
Review Comment:
The initial variant of this test had all the records replayed (the
unassignment included) so it didn't actually check the compaction scenario with
a removed unassignment record. Removing that replay gave the expected test
behavior. I agree that we could perhaps remove the test though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]