dongnuo123 commented on code in PR #16800:
URL: https://github.com/apache/kafka/pull/16800#discussion_r1707996818


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10123,6 +10123,172 @@ public void 
testConsumerGroupHeartbeatWithPreparingRebalanceClassicGroup() throw
         assertEquals(group, 
context.groupMetadataManager.getOrMaybeCreateClassicGroup("group-id", false));
     }
 
+    @Test
+    public void 
testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
+        String groupId = "group-id";
+        String memberId = "member-id";
+        String instanceId = "instance-id";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 1)
+            .addRacks()
+            .build();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.UPGRADE)
+            .withConsumerGroupAssignors(Collections.singletonList(new 
NoOpPartitionAssignor()))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = 
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+        protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+            .setName(NoOpPartitionAssignor.NAME)
+            
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList(fooTopicName),
+                null,
+                Collections.singletonList(new TopicPartition(fooTopicName, 0))
+            ))))
+        );
+
+        Map<String, byte[]> assignments = new HashMap<String, byte[]>() {
+            {
+                put(
+                    memberId,
+                    Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(
+                        Collections.singletonList(new 
TopicPartition(fooTopicName, 0))
+                    )))
+                );
+            }
+        };
+
+        // Create a stable classic group with a static member.
+        ClassicGroup group = context.createClassicGroup(groupId);
+        group.setProtocolName(Optional.ofNullable(NoOpPartitionAssignor.NAME));
+        group.add(
+            new ClassicGroupMember(
+                memberId,
+                Optional.ofNullable(instanceId),
+                "client",
+                "localhost/127.0.0.1",
+                10000,
+                5000,
+                "consumer",
+                protocols,
+                assignments.get(memberId)
+            )
+        );
+
+        group.transitionTo(PREPARING_REBALANCE);
+        group.transitionTo(COMPLETING_REBALANCE);
+        group.transitionTo(STABLE);
+
+        context.replay(CoordinatorRecordHelpers.newGroupMetadataRecord(group, 
assignments, metadataImage.features().metadataVersion()));
+        context.commit();
+        group = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
+
+        // The static member rejoins with new protocol, triggering the upgrade.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setInstanceId(instanceId)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor(NoOpPartitionAssignor.NAME)
+                
.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+                .setTopicPartitions(Collections.emptyList()));
+
+        ConsumerGroupMember expectedClassicMember = new 
ConsumerGroupMember.Builder(memberId)
+            .setInstanceId(instanceId)
+            .setMemberEpoch(0)
+            .setPreviousMemberEpoch(0)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+            .setRebalanceTimeoutMs(10000)
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(5000)
+                    
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(protocols))
+            )
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0)))
+            .build();
+
+        String newMemberId = result.response().memberId();
+        ConsumerGroupMember expectedReplacingConsumerMember = new 
ConsumerGroupMember.Builder(newMemberId)
+            .setInstanceId(instanceId)
+            .setMemberEpoch(0)
+            .setPreviousMemberEpoch(0)
+            .setState(MemberState.STABLE)
+            .setClientId(expectedClassicMember.clientId())
+            .setClientHost(expectedClassicMember.clientHost())
+            .setSubscribedTopicNames(new 
ArrayList<>(expectedClassicMember.subscribedTopicNames()))
+            .setRebalanceTimeoutMs(expectedClassicMember.rebalanceTimeoutMs())
+            .setAssignedPartitions(expectedClassicMember.assignedPartitions())
+            
.setClassicMemberMetadata(expectedClassicMember.classicMemberMetadata().get())
+            .build();
+
+        ConsumerGroupMember expectedFinalConsumerMember = new 
ConsumerGroupMember.Builder(expectedReplacingConsumerMember)
+            .setMemberEpoch(1)
+            .setServerAssignorName(NoOpPartitionAssignor.NAME)
+            .setRebalanceTimeoutMs(5000)
+            .setClassicMemberMetadata(null)
+            .build();
+
+        List<CoordinatorRecord> expectedRecords = Arrays.asList(
+            // The existing classic group tombstone.
+            CoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId),
+
+            // Create the new consumer group with the static member.
+            CoordinatorRecordHelpers.newMemberSubscriptionRecord(groupId, 
expectedClassicMember),
+            CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 0),
+            CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, 
memberId, expectedClassicMember.assignedPartitions()),
+            CoordinatorRecordHelpers.newTargetAssignmentEpochRecord(groupId, 
0),
+            CoordinatorRecordHelpers.newCurrentAssignmentRecord(groupId, 
expectedClassicMember),
+
+            // Remove the static member because the rejoining member replaces 
it.
+            
CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId),
+            
CoordinatorRecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId),
+            
CoordinatorRecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, 
memberId),
+
+            // Create the new static member.
+            CoordinatorRecordHelpers.newMemberSubscriptionRecord(groupId, 
expectedReplacingConsumerMember),
+            CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, 
newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
+            CoordinatorRecordHelpers.newCurrentAssignmentRecord(groupId, 
expectedReplacingConsumerMember),
+
+            // The static member rejoins the new consumer group.
+            CoordinatorRecordHelpers.newMemberSubscriptionRecord(groupId, 
expectedFinalConsumerMember),
+
+            // The subscription metadata hasn't been updated during the 
conversion, so a new one is computed.
+            
CoordinatorRecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 1, new HashMap<Integer, Set<String>>() {
+                        {
+                            put(0, new HashSet<>(Arrays.asList("rack0", 
"rack1")));
+                        }
+                    }));
+                }
+            }),

Review Comment:
   Thanks for the comment! Just realized we have `mkMapOfPartitionRacks` for 
quick initialization of such maps Let me also replace similar cases with it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to