dongnuo123 commented on code in PR #22457:
URL: https://github.com/apache/kafka/pull/22457#discussion_r3351535772


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -14187,6 +14187,86 @@ public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
         assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void 
testJoiningConsumerGroupWithClassicProtocolFailsIfMigrationDisabled(boolean 
isStatic) {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
ConsumerGroupMigrationPolicy.DISABLED.toString())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .build()))
+            .build();
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withGroupInstanceId(isStatic ? "new-instance-id" : null)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        Exception ex = assertThrows(GroupIdNotFoundException.class, () -> 
context.sendClassicGroupJoin(request, isStatic));
+        assertEquals(
+            String.format("Cannot join the consumer group %s with the classic 
protocol because the group migration is disabled.", groupId),
+            ex.getMessage()
+        );
+    }
+
+    @Test
+    public void testRejoiningClassicMemberIsAllowedWhenMigrationDisabled() 
throws Exception {
+        String groupId = "group-id";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 2)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        String memberId = Uuid.randomUuid().toString();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
ConsumerGroupMigrationPolicy.DISABLED.toString())
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new 
NoOpPartitionAssignor()))
+            .withMetadataImage(metadataImage)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setRebalanceTimeoutMs(500)
+                    .setSubscribedTopicNames(List.of(fooTopicName))
+                    .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1)), 10))
+                    .setClassicMemberMetadata(
+                        new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                            .setSessionTimeoutMs(500)
+                            
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(
+                                
GroupMetadataManagerTestContext.toProtocols("range"))))
+                    .build())
+                .withAssignment(memberId, 
mkAssignment(mkTopicAssignment(fooTopicId, 0, 1)))
+                .withAssignmentEpoch(10)
+                .withMetadataHash(computeGroupHash(Map.of(
+                    fooTopicName, computeTopicHash(fooTopicName, 
metadataImage)))))
+            .build();
+        
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
 10);
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(memberId)
+            
.withProtocols(GroupMetadataManagerTestContext.toProtocols("range"))
+            .build();
+
+        GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(request);
+        joinResult.appendFuture.complete(null);

Review Comment:
   No we don't need it actually. Removed the lines



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to