dajac commented on code in PR #18046:
URL: https://github.com/apache/kafka/pull/18046#discussion_r1871426569


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10077,6 +10082,116 @@ barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1)
         assertEquals(group, 
context.groupMetadataManager.getOrMaybeCreateClassicGroup("group-id", false));
     }
 
+    /**
+     * Supplies the {@link Arguments} to {@link 
#testConsumerGroupHeartbeatWithCustomAssignorClassicGroup(ByteBuffer, boolean)}.
+     */
+    private static Stream<Arguments> 
testConsumerGroupHeartbeatWithCustomAssignorClassicGroupSource() {
+        return Stream.of(
+            Arguments.of(null, true),
+            Arguments.of(ByteBuffer.allocate(0), true),
+            Arguments.of(ByteBuffer.allocate(1), false)
+        );
+    }
+
+    @ParameterizedTest
+    
@MethodSource("testConsumerGroupHeartbeatWithCustomAssignorClassicGroupSource")
+    public void 
testConsumerGroupHeartbeatWithCustomAssignorClassicGroup(ByteBuffer userData, 
boolean expectUpgrade) {
+        String groupId = "group-id";
+        String memberId1 = "member-id-1";
+        String memberId2 = "member-id-2";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+            memberId1, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0)
+            )),
+            memberId2, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(barTopicId, 0)
+            ))
+        )));
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 1)
+            .addTopic(barTopicId, barTopicName, 1)
+            .addRacks()
+            .build();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
ConsumerGroupMigrationPolicy.UPGRADE.toString())
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = 
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+        protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+            .setName("range")
+            
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                List.of(fooTopicName, barTopicName),
+                null,
+                List.of(
+                    new TopicPartition(fooTopicName, 0),
+                    new TopicPartition(barTopicName, 0)
+                )
+            ))))
+        );
+
+        Map<String, byte[]> assignments = Map.of(
+            memberId1,
+            Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(List.of(
+                new TopicPartition(fooTopicName, 0),
+                new TopicPartition(barTopicName, 0)
+            ), userData)))
+        );
+
+        // Create a stable classic group with member 1.
+        ClassicGroup group = context.createClassicGroup(groupId);
+        group.setProtocolName(Optional.of("range"));
+        group.add(
+            new ClassicGroupMember(
+                memberId1,
+                Optional.empty(),
+                "client-id",
+                "client-host",
+                10000,
+                5000,
+                "consumer",
+                protocols,
+                assignments.get(memberId1)
+            )
+        );
+
+        group.transitionTo(PREPARING_REBALANCE);
+        group.transitionTo(COMPLETING_REBALANCE);
+        group.transitionTo(STABLE);
+
+        
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, 
assignments, metadataImage.features().metadataVersion()));
+        context.commit();
+        group = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
+
+        // A new member 2 with new protocol joins the classic group, 
triggering the upgrade.
+        ConsumerGroupHeartbeatRequestData consumerGroupHeartbeatRequestData =
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+                .setTopicPartitions(Collections.emptyList());
+
+        if (expectUpgrade) {
+            context.consumerGroupHeartbeat(consumerGroupHeartbeatRequestData);
+        } else {
+            Exception ex = assertThrows(GroupIdNotFoundException.class, () -> 
context.consumerGroupHeartbeat(consumerGroupHeartbeatRequestData));
+            assertEquals(
+                "Cannot upgrade the classic group group-id to consumer group 
because a custom assignor with userData is in use. " +
+                "Switch to a default assignor before re-attempting the 
upgrade.", ex.getMessage());
+        }
+    }
+

Review Comment:
   I wonder whether we could add an integration test in 
ConsumerProtocolMigrationTest. Is it worth?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1053,6 +1055,13 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup 
classicGroup, List<Coordinator
 
             throw new GroupIdNotFoundException("Cannot upgrade the classic 
group " + classicGroup.groupId() +
                 " to consumer group because the embedded consumer protocol is 
malformed.");
+        } catch (UnsupportedVersionException e) {
+            log.warn("Cannot upgrade the classic group " + 
classicGroup.groupId() +
+                " to consumer group: " + e.getMessage() + ".", e);
+
+            throw new GroupIdNotFoundException("Cannot upgrade the classic 
group " + classicGroup.groupId() +
+                " to consumer group because a custom assignor with userData is 
in use. " +

Review Comment:
   I wonder if we could make `userData` clearer. Most of the users won't 
understand it. Should we perhaps say `... because an unsupported custom 
assignor is in use. Please refer to the document or switch to ...`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to