dajac commented on code in PR #22457:
URL: https://github.com/apache/kafka/pull/22457#discussion_r3348606657


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2498,6 +2518,13 @@ private CoordinatorResult<Void, CoordinatorRecord> 
classicGroupJoinToConsumerGro
         throwIfConsumerGroupIsFull(group, memberId);
         throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
 
+        // Under the disabled migration policy, a new classic member is not 
allowed.
+        // Members that are already in the group may still rejoin.
+        boolean isNewMemberJoining = instanceId == null
+            ? !group.hasMember(memberId)
+            : group.staticMember(instanceId) == null;
+        throwIfClassicMemberCannotJoinConsumerGroup(group, isNewMemberJoining);

Review Comment:
   As we pass the group to `throwIfClassicMemberCannotJoinConsumerGroup`, have 
you considered passing the `memberId` and the `instanceId` too so we can fully 
delegate all the logic to the method?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -14187,6 +14187,86 @@ public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
         assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void 
testJoiningConsumerGroupWithClassicProtocolFailsIfMigrationDisabled(boolean 
isStatic) {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
ConsumerGroupMigrationPolicy.DISABLED.toString())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .build()))
+            .build();
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withGroupInstanceId(isStatic ? "new-instance-id" : null)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        Exception ex = assertThrows(GroupIdNotFoundException.class, () -> 
context.sendClassicGroupJoin(request, isStatic));
+        assertEquals(
+            String.format("Cannot join the consumer group %s with the classic 
protocol because the group migration is disabled.", groupId),
+            ex.getMessage()
+        );
+    }
+
+    @Test
+    public void testRejoiningClassicMemberIsAllowedWhenMigrationDisabled() 
throws Exception {
+        String groupId = "group-id";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 2)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        String memberId = Uuid.randomUuid().toString();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
ConsumerGroupMigrationPolicy.DISABLED.toString())
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new 
NoOpPartitionAssignor()))
+            .withMetadataImage(metadataImage)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setRebalanceTimeoutMs(500)
+                    .setSubscribedTopicNames(List.of(fooTopicName))
+                    .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1)), 10))
+                    .setClassicMemberMetadata(
+                        new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                            .setSessionTimeoutMs(500)
+                            
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(
+                                
GroupMetadataManagerTestContext.toProtocols("range"))))
+                    .build())
+                .withAssignment(memberId, 
mkAssignment(mkTopicAssignment(fooTopicId, 0, 1)))
+                .withAssignmentEpoch(10)
+                .withMetadataHash(computeGroupHash(Map.of(
+                    fooTopicName, computeTopicHash(fooTopicName, 
metadataImage)))))
+            .build();
+        
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
 10);
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(memberId)
+            
.withProtocols(GroupMetadataManagerTestContext.toProtocols("range"))
+            .build();
+
+        GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(request);
+        joinResult.appendFuture.complete(null);

Review Comment:
   This looks weird here. Do we really need it?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -14187,6 +14187,86 @@ public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
         assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));

Review Comment:
   Should we also extend migration integration tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to