squah-confluent commented on code in PR #18046: URL: https://github.com/apache/kafka/pull/18046#discussion_r1875402835
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -10077,6 +10082,116 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) assertEquals(group, context.groupMetadataManager.getOrMaybeCreateClassicGroup("group-id", false)); } + /** + * Supplies the {@link Arguments} to {@link #testConsumerGroupHeartbeatWithCustomAssignorClassicGroup(ByteBuffer, boolean)}. + */ + private static Stream<Arguments> testConsumerGroupHeartbeatWithCustomAssignorClassicGroupSource() { + return Stream.of( + Arguments.of(null, true), + Arguments.of(ByteBuffer.allocate(0), true), + Arguments.of(ByteBuffer.allocate(1), false) + ); + } + + @ParameterizedTest + @MethodSource("testConsumerGroupHeartbeatWithCustomAssignorClassicGroupSource") + public void testConsumerGroupHeartbeatWithCustomAssignorClassicGroup(ByteBuffer userData, boolean expectUpgrade) { + String groupId = "group-id"; + String memberId1 = "member-id-1"; + String memberId2 = "member-id-2"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of( + memberId1, new MemberAssignmentImpl(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + )), + memberId2, new MemberAssignmentImpl(mkAssignment( + mkTopicAssignment(barTopicId, 0) + )) + ))); + + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 1) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build(); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.UPGRADE.toString()) + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1); + protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( + List.of(fooTopicName, barTopicName), + null, + List.of( + new TopicPartition(fooTopicName, 0), + new TopicPartition(barTopicName, 0) + ) + )))) + ); + + Map<String, byte[]> assignments = Map.of( + memberId1, + Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(List.of( + new TopicPartition(fooTopicName, 0), + new TopicPartition(barTopicName, 0) + ), userData))) + ); + + // Create a stable classic group with member 1. + ClassicGroup group = context.createClassicGroup(groupId); + group.setProtocolName(Optional.of("range")); + group.add( + new ClassicGroupMember( + memberId1, + Optional.empty(), + "client-id", + "client-host", + 10000, + 5000, + "consumer", + protocols, + assignments.get(memberId1) + ) + ); + + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(COMPLETING_REBALANCE); + group.transitionTo(STABLE); + + context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, assignments, metadataImage.features().metadataVersion())); + context.commit(); + group = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false); + + // A new member 2 with new protocol joins the classic group, triggering the upgrade. + ConsumerGroupHeartbeatRequestData consumerGroupHeartbeatRequestData = + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setRebalanceTimeoutMs(5000) + .setServerAssignor("range") + .setSubscribedTopicNames(List.of(fooTopicName, barTopicName)) + .setTopicPartitions(Collections.emptyList()); + + if (expectUpgrade) { + context.consumerGroupHeartbeat(consumerGroupHeartbeatRequestData); + } else { + Exception ex = assertThrows(GroupIdNotFoundException.class, () -> context.consumerGroupHeartbeat(consumerGroupHeartbeatRequestData)); + assertEquals( + "Cannot upgrade the classic group group-id to consumer group because a custom assignor with userData is in use. " + + "Switch to a default assignor before re-attempting the upgrade.", ex.getMessage()); + } + } + Review Comment: I didn't know we had integration tests for group upgrades. This is definitely worth an integration test. I've gone ahead and added one. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org