dongnuo123 commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1516453885


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -9607,6 +9607,151 @@ public void 
testOnConsumerGroupStateTransitionOnLoading() {
         verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
     }
 
+    @Test
+    public void testMaybeUpgradeEmptyGroup() {
+        String classicGroupId = "classic-group-id";
+        String consumerGroupId = "consumer-group-id";
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        ClassicGroup classicGroup = new ClassicGroup(
+            new LogContext(),
+            classicGroupId,
+            EMPTY,
+            context.time,
+            context.metrics
+        );
+        context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+        context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+        // A consumer group can't be upgraded.
+        List<Record> records = new ArrayList<>();
+        context.groupMetadataManager.maybeUpgradeEmptyGroup(consumerGroupId, 
records);
+        assertEquals(Collections.emptyList(), records);
+
+        // A non-empty classic group can't be upgraded.
+        
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(PREPARING_REBALANCE);
+        context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, 
records);
+        assertEquals(Collections.emptyList(), records);
+
+        // An empty classic group can be upgraded.
+        
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(EMPTY);
+        context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, 
records);
+        
assertEquals(Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId)),
 records);
+    }
+
+    @Test
+    public void testMaybeDowngradeEmptyGroup() {
+        String classicGroupId = "classic-group-id";
+        String consumerGroupId = "consumer-group-id";
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        ClassicGroup classicGroup = new ClassicGroup(
+            new LogContext(),
+            classicGroupId,
+            EMPTY,
+            context.time,
+            context.metrics
+        );
+        context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+        context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+        List<Record> records = new ArrayList<>();
+        context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+        assertEquals(Collections.emptyList(), records);
+
+        // A classic group can't be downgraded.
+        context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+        assertEquals(Collections.emptyList(), records);
+
+        // An empty consumer group can be upgraded.
+        context.groupMetadataManager.maybeDowngradeEmptyGroup(consumerGroupId, 
records);
+        assertEquals(Arrays.asList(
+            
RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId),
+            
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId),
+            RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)), 
records);
+        records.clear();
+
+        // A non-empty consumer group can't be downgraded.
+        ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(Uuid.randomUuid().toString());
+        
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, 
memberBuilder.build()));
+        context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+        assertEquals(Collections.emptyList(), records);
+    }
+
+    @Test
+    public void testConsumerGroupHeartbeatWithEmptyClassicGroup() {
+        String classicGroupId = "classic-group-id";
+        String memberId = Uuid.randomUuid().toString();
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+        ClassicGroup classicGroup = new ClassicGroup(
+            new LogContext(),
+            classicGroupId,
+            EMPTY,
+            context.time,
+            context.metrics
+        );
+        context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+
+        
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(PREPARING_REBALANCE);
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(classicGroupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setServerAssignor("range")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setTopicPartitions(Collections.emptyList())));
+
+        
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(EMPTY);
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(

Review Comment:
   Got it. Let me add a type check when replaying classic group tombstone. Now 
it will be like (1) remove the classic group from the map, (2) create the 
consumer group, (3) the classic group tombstone replay fails because the group 
is no longer a classic group and (4) replay the consumer group records, which 
only updates the group epoch because the consumer group already exists.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to