dongnuo123 commented on code in PR #22264:
URL: https://github.com/apache/kafka/pull/22264#discussion_r3276786678


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -12172,6 +12172,140 @@ memberId2, new MemberAssignmentImpl(mkAssignment(
         }
     }
 
+    @Test
+    public void testUpgradeFailsOnMalformedClassicGroupProtocol() {
+        String groupId = "group-id";
+        String memberId1 = "member-id-1";
+        String memberId2 = "member-id-2";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 1)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
ConsumerGroupMigrationPolicy.UPGRADE.toString())
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Throws RuntimeException when read
+        byte[] poisonMetadata = new byte[]{0, 1, (byte) 0xFF, (byte) 0xFF, 
(byte) 0xFF, (byte) 0xFF};
+
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = 
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+        protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(poisonMetadata));
+
+        Map<String, byte[]> assignments = Map.of(
+            memberId1,
+            Utils.toArray(ConsumerProtocol.serializeAssignment(
+                new ConsumerPartitionAssignor.Assignment(List.of(new 
TopicPartition(fooTopicName, 0)))))
+        );
+
+        ClassicGroup group = context.createClassicGroup(groupId);
+        group.setProtocolName(Optional.of("range"));
+        group.add(
+            new ClassicGroupMember(
+                memberId1,
+                Optional.empty(),
+                "client-id",
+                "client-host",
+                10000,
+                5000,
+                "consumer",
+                protocols,
+                assignments.get(memberId1)
+            )
+        );
+
+        group.transitionTo(PREPARING_REBALANCE);
+        group.transitionTo(COMPLETING_REBALANCE);
+        group.transitionTo(STABLE);
+
+        
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, 
assignments));
+        context.commit();
+
+        // A new member 2 with the new protocol joins the classic group, 
triggering the upgrade.
+        ConsumerGroupHeartbeatRequestData request = new 
ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId2)
+            .setRebalanceTimeoutMs(5000)
+            .setServerAssignor("range")
+            .setSubscribedTopicNames(List.of(fooTopicName))
+            .setTopicPartitions(List.of());
+
+        Exception ex = assertThrows(GroupIdNotFoundException.class,
+            () -> context.consumerGroupHeartbeat(request));
+        assertEquals(
+            "Cannot upgrade classic group group-id to consumer group because 
the embedded consumer protocol is malformed.",
+            ex.getMessage()
+        );
+    }
+
+    @Test
+    public void 
testClassicJoinToConsumerGroupFailsOnMalformedSubscriptionMetadata() {
+        String groupId = "group-id";
+        String existingMemberId = Uuid.randomUuid().toString();
+        String newMemberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 1)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        ConsumerGroupMember existingMember = new 
ConsumerGroupMember.Builder(existingMemberId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of(fooTopicName))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0)), 10))
+            .build();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
ConsumerGroupMigrationPolicy.UPGRADE.toString())
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new 
MockPartitionAssignor("range")))
+            .withMetadataImage(metadataImage)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(existingMember)
+                .withAssignment(existingMemberId, 
mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+                .withAssignmentEpoch(10)
+                .withMetadataHash(computeGroupHash(Map.of(
+                    fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+                ))))
+            .build();
+
+        // Throws RuntimeException when read.
+        byte[] poisonMetadata = new byte[]{0, 1, (byte) 0xFF, (byte) 0xFF, 
(byte) 0xFF, (byte) 0xFF};
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = 
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+        protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(poisonMetadata));
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestData()
+            .setGroupId(groupId)
+            .setMemberId(newMemberId)
+            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .setProtocols(protocols)
+            .setSessionTimeoutMs(5000)
+            .setRebalanceTimeoutMs(45000);
+
+        IllegalStateException ex = assertThrows(IllegalStateException.class,
+            () -> context.sendClassicGroupJoin(joinRequest));
+        assertEquals("Malformed embedded consumer protocol in subscription 
deserialization.", ex.getMessage());

Review Comment:
   There's no corresponding error code for illegal state so it will be 
translated to an unknown server exception. It's actually different from what 
happens in the real classic group case where the leader just throws the schema 
exception out to the user app. For the other consumers in the group, they will 
be stuck waiting for sync response



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to