squah-confluent commented on code in PR #22264:
URL: https://github.com/apache/kafka/pull/22264#discussion_r3279288515
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -12172,6 +12172,140 @@ memberId2, new MemberAssignmentImpl(mkAssignment(
}
}
+ @Test
+ public void testUpgradeFailsOnMalformedClassicGroupProtocol() {
+ String groupId = "group-id";
+ String memberId1 = "member-id-1";
+ String memberId2 = "member-id-2";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 1)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG,
ConsumerGroupMigrationPolicy.UPGRADE.toString())
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Throws RuntimeException when read
+ byte[] poisonMetadata = new byte[]{0, 1, (byte) 0xFF, (byte) 0xFF,
(byte) 0xFF, (byte) 0xFF};
+
+ JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+ protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("range")
+ .setMetadata(poisonMetadata));
+
+ Map<String, byte[]> assignments = Map.of(
+ memberId1,
+ Utils.toArray(ConsumerProtocol.serializeAssignment(
+ new ConsumerPartitionAssignor.Assignment(List.of(new
TopicPartition(fooTopicName, 0)))))
+ );
+
+ ClassicGroup group = context.createClassicGroup(groupId);
+ group.setProtocolName(Optional.of("range"));
+ group.add(
+ new ClassicGroupMember(
+ memberId1,
+ Optional.empty(),
+ "client-id",
+ "client-host",
+ 10000,
+ 5000,
+ "consumer",
+ protocols,
+ assignments.get(memberId1)
+ )
+ );
+
+ group.transitionTo(PREPARING_REBALANCE);
+ group.transitionTo(COMPLETING_REBALANCE);
+ group.transitionTo(STABLE);
+
+
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group,
assignments));
+ context.commit();
+
+ // A new member 2 with the new protocol joins the classic group,
triggering the upgrade.
+ ConsumerGroupHeartbeatRequestData request = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setRebalanceTimeoutMs(5000)
+ .setServerAssignor("range")
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setTopicPartitions(List.of());
+
+ Exception ex = assertThrows(GroupIdNotFoundException.class,
+ () -> context.consumerGroupHeartbeat(request));
+ assertEquals(
+ "Cannot upgrade classic group group-id to consumer group because
the embedded consumer protocol is malformed.",
+ ex.getMessage()
+ );
+ }
+
+ @Test
+ public void
testClassicJoinToConsumerGroupFailsOnMalformedSubscriptionMetadata() {
+ String groupId = "group-id";
+ String existingMemberId = Uuid.randomUuid().toString();
+ String newMemberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 1)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ ConsumerGroupMember existingMember = new
ConsumerGroupMember.Builder(existingMemberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0)), 10))
+ .build();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG,
ConsumerGroupMigrationPolicy.UPGRADE.toString())
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new
MockPartitionAssignor("range")))
+ .withMetadataImage(metadataImage)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(existingMember)
+ .withAssignment(existingMemberId,
mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+ .withAssignmentEpoch(10)
+ .withMetadataHash(computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ))))
+ .build();
+
+ // Throws RuntimeException when read.
+ byte[] poisonMetadata = new byte[]{0, 1, (byte) 0xFF, (byte) 0xFF,
(byte) 0xFF, (byte) 0xFF};
+ JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+ protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("range")
+ .setMetadata(poisonMetadata));
+
+ JoinGroupRequestData joinRequest = new JoinGroupRequestData()
+ .setGroupId(groupId)
+ .setMemberId(newMemberId)
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setProtocols(protocols)
+ .setSessionTimeoutMs(5000)
+ .setRebalanceTimeoutMs(45000);
+
+ IllegalStateException ex = assertThrows(IllegalStateException.class,
+ () -> context.sendClassicGroupJoin(joinRequest));
+ assertEquals("Malformed embedded consumer protocol in subscription
deserialization.", ex.getMessage());
Review Comment:
Thanks! I wonder if there's a better error code to return. It's fine to
leave it for this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]