dajac commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2598538427
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,287 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction()
{
+ String groupId = "fooup";
+ String memberIdA = "memberIdA";
+ String memberIdB = "memberIdB";
+ Uuid topicId = Uuid.randomUuid();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition 0.
+ // 2. Member A is unassigned partition 0 [record removed by
compaction].
+ // 3. Member B is assigned partition 0.
+ // 4. Member A is assigned partition 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned partitions as long as
the epoch is larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0 must remain with member B at epoch 12 even though
member A has just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
+
+ // Verify partition epochs.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+ assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+ }
+
+ @Test
+ public void
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+ String groupId = "fooup";
+ String memberIdA = "memberIdA";
+ String memberIdB = "memberIdB";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Uuid barTopicId = Uuid.randomUuid();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition foo-0.
+ // 2. Member A is unassigned partition foo-0 [record removed by
compaction].
+ // 3. Member B is assigned partition foo-0.
+ // 4. Member B is unassigned partition foo-0.
+ // 5. Member A is assigned partition bar-0.
+ // This is a legitimate set of assignments but with compaction the
unassignment record can be skipped.
+ // This can lead to conflicts from updating an owned partition in step
3 and attempting
+ // to remove nonexistent ownership in step 5. We want to ensure
removing ownership from a
+ // completely unowned partition in step 5 is allowed.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+ .build()));
+
+ // foo-0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+ .build()));
+
+ // foo becomes unowned.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .build()));
+
+ // Member A is unassigned foo-0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(14)
+ .setPreviousMemberEpoch(13)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId,
0)))
+ .build()));
+
+ // Verify foo-0 is unowned and bar-0 is owned by member A at epoch 14.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(-1, group.currentPartitionEpoch(fooTopicId, 0));
+ assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+ }
+
+ @Test
+ public void
testReplayConsumerGroupCurrentMemberAssignmentSameEpochWithCompaction() {
+ String groupId = "fooup";
+ String memberIdA = "memberIdA";
+ String memberIdB = "memberIdB";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Uuid barTopicId = Uuid.randomUuid();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
+
+ // This test enacts the following scenario:
+ // 1. Member A unsubscribes from topic bar at epoch 10: Member A {
epoch: 10, assigned partitions: [foo], pending revocations: [bar] }
+ // 2. A new assignment is available at epoch 11 with member A
unsubscribing from topic foo.
+ // 3. Member A yields bar. The epoch is bumped to 11: Member A {
epoch: 11, assigned partitions: [], pending revocations: [foo] }
+ // 4. Member A yields topic foo. Member A { epoch: 11, assigned
partitions: [], pending revocations: [] } [removed by compaction]
+ // 5. Member B is assigned topic foo. Member B { epoch: 11, assigned
partitions: [foo], pending revocations: [] }
Review Comment:
This is addressed as part of https://github.com/apache/kafka/pull/21107.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]