dajac commented on code in PR #22457:
URL: https://github.com/apache/kafka/pull/22457#discussion_r3348606657
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2498,6 +2518,13 @@ private CoordinatorResult<Void, CoordinatorRecord>
classicGroupJoinToConsumerGro
throwIfConsumerGroupIsFull(group, memberId);
throwIfClassicProtocolIsNotSupported(group, memberId,
request.protocolType(), protocols);
+ // Under the disabled migration policy, a new classic member is not
allowed.
+ // Members that are already in the group may still rejoin.
+ boolean isNewMemberJoining = instanceId == null
+ ? !group.hasMember(memberId)
+ : group.staticMember(instanceId) == null;
+ throwIfClassicMemberCannotJoinConsumerGroup(group, isNewMemberJoining);
Review Comment:
As we pass the group to `throwIfClassicMemberCannotJoinConsumerGroup`, have
you considered passing the `memberId` and the `instanceId` too so we can fully
delegate all the logic to the method?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -14187,6 +14187,86 @@ public void
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
assertThrows(InconsistentGroupProtocolException.class, () ->
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void
testJoiningConsumerGroupWithClassicProtocolFailsIfMigrationDisabled(boolean
isStatic) {
+ String groupId = "group-id";
+ String memberId = Uuid.randomUuid().toString();
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG,
ConsumerGroupMigrationPolicy.DISABLED.toString())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .build()))
+ .build();
+
+ JoinGroupRequestData request = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withGroupInstanceId(isStatic ? "new-instance-id" : null)
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+
+ Exception ex = assertThrows(GroupIdNotFoundException.class, () ->
context.sendClassicGroupJoin(request, isStatic));
+ assertEquals(
+ String.format("Cannot join the consumer group %s with the classic
protocol because the group migration is disabled.", groupId),
+ ex.getMessage()
+ );
+ }
+
+ @Test
+ public void testRejoiningClassicMemberIsAllowedWhenMigrationDisabled()
throws Exception {
+ String groupId = "group-id";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ String memberId = Uuid.randomUuid().toString();
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG,
ConsumerGroupMigrationPolicy.DISABLED.toString())
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new
NoOpPartitionAssignor()))
+ .withMetadataImage(metadataImage)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setRebalanceTimeoutMs(500)
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)), 10))
+ .setClassicMemberMetadata(
+ new
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(500)
+
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(
+
GroupMetadataManagerTestContext.toProtocols("range"))))
+ .build())
+ .withAssignment(memberId,
mkAssignment(mkTopicAssignment(fooTopicId, 0, 1)))
+ .withAssignmentEpoch(10)
+ .withMetadataHash(computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName,
metadataImage)))))
+ .build();
+
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
10);
+
+ JoinGroupRequestData request = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(memberId)
+
.withProtocols(GroupMetadataManagerTestContext.toProtocols("range"))
+ .build();
+
+ GroupMetadataManagerTestContext.JoinResult joinResult =
context.sendClassicGroupJoin(request);
+ joinResult.appendFuture.complete(null);
Review Comment:
This looks weird here. Do we really need it?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -14187,6 +14187,86 @@ public void
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
assertThrows(InconsistentGroupProtocolException.class, () ->
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
Review Comment:
Should we also extend migration integration tests?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]