izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2585592430


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,296 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
         assertDoesNotThrow(() -> context.replay(record));
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberA = "memberA";
+        String memberB = "memberB";
+
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+            .addTopic(topicId, topicName, 2)
+            .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+        String groupId = "fooup";
+        String memberA = "memberA";
+        String memberB = "memberB";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 3)
+            .addTopic(barTopicId, barTopicName, 1)
+            .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition foo-1.
+        // 2. Member A is unassigned partition foo-1 [record removed by 
compaction].
+        // 3. Member B is assigned partition foo-1.
+        // 4. Member B is unassigned partition foo-1. 
+        // 5. Member A is assigned partition bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // We would like to not fail in these cases and allow both the 
assignment of member B to foo-1 and 
+        // member A to bar-0 to succeed because the epochs are larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0, 1)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
1, 2)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(14)
+            .setPreviousMemberEpoch(13)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 
0)))
+            .build()));
+
+        // Verify member A only has ownership of partition bar-0. Member B has 
no partitions.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(mkAssignment(mkTopicAssignment(barTopicId, 0)), 
group.members().get(memberA).assignedPartitions());
+        assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupCurrentMemberAssignmentSameEpochWithCompaction() {
+        String groupId = "fooup";
+        String memberA = "memberA";
+        String memberB = "memberB";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 1)
+            .addTopic(barTopicId, barTopicName, 1)
+            .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A unsubscribes from topic bar at epoch 10: Member A { 
epoch: 10, assigned partitions: [foo], pending revocations: [bar] }
+        // 2. A new assignment is available at epoch 11 with member A 
unsubscribing from topic foo.
+        // 3. Member A yields bar. The epoch is bumped to 11: Member A { 
epoch: 11, assigned partitions: [], pending revocations: [foo] }
+        // 4. Member A yields topic foo. Member A { epoch: 11, assigned 
partitions: [], pending revocations: [] } [removed by compaction]
+        // 5. Member B is assigned topic foo. Member B { epoch: 11, assigned 
partitions: [foo], pending revocations: [] }
+        // When record 4 is dropped by compaction, we want member B's 
assignment to be accepted with the same epoch. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(barTopicId, 0)))
+            .build()));
+
+        // Member A yields bar at epoch 11.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+            .build()));
+
+        // Member A yields foo. [record removed by compaction]
+        // Member B is assigned foo at epoch 11.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        // Verify partition foo-0 is assigned to member B at epoch 11.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(mkAssignment(mkTopicAssignment(fooTopicId, 0)), 
group.members().get(memberB).assignedPartitions());
+        assertEquals(11, group.currentPartitionEpoch(fooTopicId, 0));
+    }
+
+    @Test
+    public void testReplayStreamsGroupCurrentMemberAssignmentWithCompaction() {
+        String groupId = "fooup";
+        String memberA = "memberA";
+        String memberB = "memberB";
+
+        String subtopology = "subtopology";
+        String topicName = "foo";
+        Uuid topicId = Uuid.randomUuid();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(topicId, topicName, 2)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned task 0.
+        // 2. Member A is unassigned task 0 [record removed by compaction].
+        // 3. Member B is assigned task 0. 
+        // 4. Member A is assigned task 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned tasks as long as the 
epoch is larger. 
+
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberA)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                    TaskAssignmentTestUtil.mkTasksWithEpochs(subtopology, 
Map.of(0, 11))))
+            .build()));
+
+        // Task 0's owner is replaced by member B at epoch 12.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberB)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                    TaskAssignmentTestUtil.mkTasksWithEpochs(subtopology, 
Map.of(0, 12))))
+            .build()));

Review Comment:
   We're using 
`StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord` for 
the test cases in order to replay records checking different scenarios where 
compaction may drop some unassignments. If you create and only interact with a 
group member using that assignment function, the process-id field of the member 
just gets `null` and so the members get added to the `members` field in the 
group but the `ActiveTasks` map doesn't get updated (which was the testing 
issue). 
   
   I don't think it's a bug so much as a misunderstanding of how StreamsGroup 
members are set up from my side because I was basing it off of the tests for 
ConsumerGroup members where it's not necessary to take that extra step. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to