izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2593425805


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,287 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
         assertDoesNotThrow(() -> context.replay(record));
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        Uuid topicId = Uuid.randomUuid();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition foo-0.
+        // 2. Member A is unassigned partition foo-0 [record removed by 
compaction].
+        // 3. Member B is assigned partition foo-0.
+        // 4. Member B is unassigned partition foo-0. 
+        // 5. Member A is assigned partition bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // This can lead to conflicts from updating an owned partition in step 
3 and attempting 
+        // to remove nonexistent ownership in step 5. We want to ensure 
removing ownership from a 
+        // completely unowned partition in step 5 is allowed.  
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        // foo-0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        // foo becomes unowned.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .build()));
+
+        // Member A is unassigned foo-0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(14)
+            .setPreviousMemberEpoch(13)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 
0)))
+            .build()));
+
+        // Verify foo-0 is unowned and bar-0 is owned by member A at epoch 14.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(-1, group.currentPartitionEpoch(fooTopicId, 0));
+        assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupCurrentMemberAssignmentSameEpochWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // This test enacts the following scenario:

Review Comment:
   When you say not consistent in the format, do you mean the wording of the 
comment/test explanation or is it something else? Just want to understand 
exactly what the issue is from your side for the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to