dongnuo123 commented on code in PR #22264:
URL: https://github.com/apache/kafka/pull/22264#discussion_r3276786678
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -12172,6 +12172,140 @@ memberId2, new MemberAssignmentImpl(mkAssignment(
}
}
+ @Test
+ public void testUpgradeFailsOnMalformedClassicGroupProtocol() {
+ String groupId = "group-id";
+ String memberId1 = "member-id-1";
+ String memberId2 = "member-id-2";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 1)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG,
ConsumerGroupMigrationPolicy.UPGRADE.toString())
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Throws RuntimeException when read
+ byte[] poisonMetadata = new byte[]{0, 1, (byte) 0xFF, (byte) 0xFF,
(byte) 0xFF, (byte) 0xFF};
+
+ JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+ protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("range")
+ .setMetadata(poisonMetadata));
+
+ Map<String, byte[]> assignments = Map.of(
+ memberId1,
+ Utils.toArray(ConsumerProtocol.serializeAssignment(
+ new ConsumerPartitionAssignor.Assignment(List.of(new
TopicPartition(fooTopicName, 0)))))
+ );
+
+ ClassicGroup group = context.createClassicGroup(groupId);
+ group.setProtocolName(Optional.of("range"));
+ group.add(
+ new ClassicGroupMember(
+ memberId1,
+ Optional.empty(),
+ "client-id",
+ "client-host",
+ 10000,
+ 5000,
+ "consumer",
+ protocols,
+ assignments.get(memberId1)
+ )
+ );
+
+ group.transitionTo(PREPARING_REBALANCE);
+ group.transitionTo(COMPLETING_REBALANCE);
+ group.transitionTo(STABLE);
+
+
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group,
assignments));
+ context.commit();
+
+ // A new member 2 with the new protocol joins the classic group,
triggering the upgrade.
+ ConsumerGroupHeartbeatRequestData request = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setRebalanceTimeoutMs(5000)
+ .setServerAssignor("range")
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setTopicPartitions(List.of());
+
+ Exception ex = assertThrows(GroupIdNotFoundException.class,
+ () -> context.consumerGroupHeartbeat(request));
+ assertEquals(
+ "Cannot upgrade classic group group-id to consumer group because
the embedded consumer protocol is malformed.",
+ ex.getMessage()
+ );
+ }
+
+ @Test
+ public void
testClassicJoinToConsumerGroupFailsOnMalformedSubscriptionMetadata() {
+ String groupId = "group-id";
+ String existingMemberId = Uuid.randomUuid().toString();
+ String newMemberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 1)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ ConsumerGroupMember existingMember = new
ConsumerGroupMember.Builder(existingMemberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0)), 10))
+ .build();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG,
ConsumerGroupMigrationPolicy.UPGRADE.toString())
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new
MockPartitionAssignor("range")))
+ .withMetadataImage(metadataImage)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(existingMember)
+ .withAssignment(existingMemberId,
mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+ .withAssignmentEpoch(10)
+ .withMetadataHash(computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ))))
+ .build();
+
+ // Throws RuntimeException when read.
+ byte[] poisonMetadata = new byte[]{0, 1, (byte) 0xFF, (byte) 0xFF,
(byte) 0xFF, (byte) 0xFF};
+ JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+ protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("range")
+ .setMetadata(poisonMetadata));
+
+ JoinGroupRequestData joinRequest = new JoinGroupRequestData()
+ .setGroupId(groupId)
+ .setMemberId(newMemberId)
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setProtocols(protocols)
+ .setSessionTimeoutMs(5000)
+ .setRebalanceTimeoutMs(45000);
+
+ IllegalStateException ex = assertThrows(IllegalStateException.class,
+ () -> context.sendClassicGroupJoin(joinRequest));
+ assertEquals("Malformed embedded consumer protocol in subscription
deserialization.", ex.getMessage());
Review Comment:
There's no corresponding error code for illegal state so it will be
translated to an unknown server exception. It's actually different from what
happens in the real classic group case where the leader just throws the schema
exception out to the user app. For the other consumers in the group, they will
be stuck waiting for sync response
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]