squah-confluent commented on code in PR #22264:
URL: https://github.com/apache/kafka/pull/22264#discussion_r3279288515


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -12172,6 +12172,140 @@ memberId2, new MemberAssignmentImpl(mkAssignment(
         }
     }
 
+    @Test
+    public void testUpgradeFailsOnMalformedClassicGroupProtocol() {
+        String groupId = "group-id";
+        String memberId1 = "member-id-1";
+        String memberId2 = "member-id-2";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 1)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
ConsumerGroupMigrationPolicy.UPGRADE.toString())
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Throws RuntimeException when read
+        byte[] poisonMetadata = new byte[]{0, 1, (byte) 0xFF, (byte) 0xFF, 
(byte) 0xFF, (byte) 0xFF};
+
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = 
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+        protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(poisonMetadata));
+
+        Map<String, byte[]> assignments = Map.of(
+            memberId1,
+            Utils.toArray(ConsumerProtocol.serializeAssignment(
+                new ConsumerPartitionAssignor.Assignment(List.of(new 
TopicPartition(fooTopicName, 0)))))
+        );
+
+        ClassicGroup group = context.createClassicGroup(groupId);
+        group.setProtocolName(Optional.of("range"));
+        group.add(
+            new ClassicGroupMember(
+                memberId1,
+                Optional.empty(),
+                "client-id",
+                "client-host",
+                10000,
+                5000,
+                "consumer",
+                protocols,
+                assignments.get(memberId1)
+            )
+        );
+
+        group.transitionTo(PREPARING_REBALANCE);
+        group.transitionTo(COMPLETING_REBALANCE);
+        group.transitionTo(STABLE);
+
+        
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, 
assignments));
+        context.commit();
+
+        // A new member 2 with the new protocol joins the classic group, 
triggering the upgrade.
+        ConsumerGroupHeartbeatRequestData request = new 
ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId2)
+            .setRebalanceTimeoutMs(5000)
+            .setServerAssignor("range")
+            .setSubscribedTopicNames(List.of(fooTopicName))
+            .setTopicPartitions(List.of());
+
+        Exception ex = assertThrows(GroupIdNotFoundException.class,
+            () -> context.consumerGroupHeartbeat(request));
+        assertEquals(
+            "Cannot upgrade classic group group-id to consumer group because 
the embedded consumer protocol is malformed.",
+            ex.getMessage()
+        );
+    }
+
+    @Test
+    public void 
testClassicJoinToConsumerGroupFailsOnMalformedSubscriptionMetadata() {
+        String groupId = "group-id";
+        String existingMemberId = Uuid.randomUuid().toString();
+        String newMemberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 1)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        ConsumerGroupMember existingMember = new 
ConsumerGroupMember.Builder(existingMemberId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of(fooTopicName))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0)), 10))
+            .build();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
ConsumerGroupMigrationPolicy.UPGRADE.toString())
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new 
MockPartitionAssignor("range")))
+            .withMetadataImage(metadataImage)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(existingMember)
+                .withAssignment(existingMemberId, 
mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+                .withAssignmentEpoch(10)
+                .withMetadataHash(computeGroupHash(Map.of(
+                    fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+                ))))
+            .build();
+
+        // Throws RuntimeException when read.
+        byte[] poisonMetadata = new byte[]{0, 1, (byte) 0xFF, (byte) 0xFF, 
(byte) 0xFF, (byte) 0xFF};
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = 
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+        protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(poisonMetadata));
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestData()
+            .setGroupId(groupId)
+            .setMemberId(newMemberId)
+            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .setProtocols(protocols)
+            .setSessionTimeoutMs(5000)
+            .setRebalanceTimeoutMs(45000);
+
+        IllegalStateException ex = assertThrows(IllegalStateException.class,
+            () -> context.sendClassicGroupJoin(joinRequest));
+        assertEquals("Malformed embedded consumer protocol in subscription 
deserialization.", ex.getMessage());

Review Comment:
   Thanks! I wonder if there's a better error code to return. It's fine to 
leave it for this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to