dongnuo123 commented on code in PR #22457:
URL: https://github.com/apache/kafka/pull/22457#discussion_r3351535772
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -14187,6 +14187,86 @@ public void
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
assertThrows(InconsistentGroupProtocolException.class, () ->
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void
testJoiningConsumerGroupWithClassicProtocolFailsIfMigrationDisabled(boolean
isStatic) {
+ String groupId = "group-id";
+ String memberId = Uuid.randomUuid().toString();
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG,
ConsumerGroupMigrationPolicy.DISABLED.toString())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .build()))
+ .build();
+
+ JoinGroupRequestData request = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withGroupInstanceId(isStatic ? "new-instance-id" : null)
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+
+ Exception ex = assertThrows(GroupIdNotFoundException.class, () ->
context.sendClassicGroupJoin(request, isStatic));
+ assertEquals(
+ String.format("Cannot join the consumer group %s with the classic
protocol because the group migration is disabled.", groupId),
+ ex.getMessage()
+ );
+ }
+
+ @Test
+ public void testRejoiningClassicMemberIsAllowedWhenMigrationDisabled()
throws Exception {
+ String groupId = "group-id";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ String memberId = Uuid.randomUuid().toString();
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG,
ConsumerGroupMigrationPolicy.DISABLED.toString())
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new
NoOpPartitionAssignor()))
+ .withMetadataImage(metadataImage)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setRebalanceTimeoutMs(500)
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)), 10))
+ .setClassicMemberMetadata(
+ new
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(500)
+
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(
+
GroupMetadataManagerTestContext.toProtocols("range"))))
+ .build())
+ .withAssignment(memberId,
mkAssignment(mkTopicAssignment(fooTopicId, 0, 1)))
+ .withAssignmentEpoch(10)
+ .withMetadataHash(computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName,
metadataImage)))))
+ .build();
+
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
10);
+
+ JoinGroupRequestData request = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(memberId)
+
.withProtocols(GroupMetadataManagerTestContext.toProtocols("range"))
+ .build();
+
+ GroupMetadataManagerTestContext.JoinResult joinResult =
context.sendClassicGroupJoin(request);
+ joinResult.appendFuture.complete(null);
Review Comment:
No we don't need it actually. Removed the lines
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]