dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1601000794
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported(
}
}
+ /**
+ * Validates if the consumer group member uses the classic protocol.
+ *
+ * @param member The ConsumerGroupMember.
+ */
+ private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember
member) {
+ if (!member.useClassicProtocol()) {
+ throw new UnknownMemberIdException(
+ String.format("Member %s does not use the classic protocol.",
member.memberId())
+ );
+ }
+ }
+
+ /**
+ * Validates if the generation id and the protocol type from the request
match those of the consumer group.
+ *
+ * @param group The ConsumerGroup.
+ * @param member The ConsumerGroupMember.
+ * @param requestGenerationId The generation id from the request.
+ * @param requestProtocolType The protocol type from the request.
+ * @param requestProtocolName The protocol name from the request.
+ */
+ private void throwIfGenerationIdOrProtocolUnmatched(
Review Comment:
nit: It may be better to split this one into two methods. One to validate
the generation. Another one to validate the protocol type and name.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported(
}
}
+ /**
+ * Validates if the consumer group member uses the classic protocol.
+ *
+ * @param member The ConsumerGroupMember.
+ */
+ private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember
member) {
+ if (!member.useClassicProtocol()) {
+ throw new UnknownMemberIdException(
+ String.format("Member %s does not use the classic protocol.",
member.memberId())
+ );
+ }
+ }
+
+ /**
+ * Validates if the generation id and the protocol type from the request
match those of the consumer group.
+ *
+ * @param group The ConsumerGroup.
+ * @param member The ConsumerGroupMember.
+ * @param requestGenerationId The generation id from the request.
+ * @param requestProtocolType The protocol type from the request.
+ * @param requestProtocolName The protocol name from the request.
+ */
+ private void throwIfGenerationIdOrProtocolUnmatched(
+ ConsumerGroup group,
+ ConsumerGroupMember member,
+ int requestGenerationId,
+ String requestProtocolType,
+ String requestProtocolName
+ ) {
+ if (member.memberEpoch() != requestGenerationId) {
+ throw Errors.ILLEGAL_GENERATION.exception(
+ String.format("The request generation id %s is not equal to
the group epoch %d from the consumer group %s.",
Review Comment:
nit: `group epoch` -> `member epoch`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported(
}
}
+ /**
+ * Validates if the consumer group member uses the classic protocol.
+ *
+ * @param member The ConsumerGroupMember.
+ */
+ private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember
member) {
+ if (!member.useClassicProtocol()) {
+ throw new UnknownMemberIdException(
+ String.format("Member %s does not use the classic protocol.",
member.memberId())
+ );
+ }
+ }
+
+ /**
+ * Validates if the generation id and the protocol type from the request
match those of the consumer group.
+ *
+ * @param group The ConsumerGroup.
+ * @param member The ConsumerGroupMember.
+ * @param requestGenerationId The generation id from the request.
+ * @param requestProtocolType The protocol type from the request.
+ * @param requestProtocolName The protocol name from the request.
+ */
+ private void throwIfGenerationIdOrProtocolUnmatched(
+ ConsumerGroup group,
+ ConsumerGroupMember member,
+ int requestGenerationId,
+ String requestProtocolType,
+ String requestProtocolName
+ ) {
+ if (member.memberEpoch() != requestGenerationId) {
+ throw Errors.ILLEGAL_GENERATION.exception(
+ String.format("The request generation id %s is not equal to
the group epoch %d from the consumer group %s.",
+ requestGenerationId, group.groupEpoch(), group.groupId())
Review Comment:
nit: `group.groupEpoch()` -> `member.memberEpoch()`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync(
return EMPTY_RESULT;
}
+ /**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group The ConsumerGroup.
+ * @param context The request context.
+ * @param request The actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains records to append.
+ */
+ private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ SyncGroupRequestData request,
+ CompletableFuture<SyncGroupResponseData> responseFuture
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ String groupId = request.groupId();
+ String memberId = request.memberId();
+ String instanceId = request.groupInstanceId();
+
+ ConsumerGroupMember member;
+ if (instanceId == null) {
+ member = group.getOrMaybeCreateMember(request.memberId(), false);
+ } else {
+ member = group.staticMember(instanceId);
+ if (member == null) {
+ throw new UnknownMemberIdException(
+ String.format("Member with instance id %s is not a member
of group %s.", instanceId, groupId)
+ );
+ }
+ throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+ }
+
+ throwIfMemberDoesNotUseClassicProtocol(member);
+ throwIfGenerationIdOrProtocolUnmatched(
+ group,
+ member,
+ request.generationId(),
+ request.protocolType(),
+ request.protocolName()
+ );
+
+ cancelConsumerGroupSyncTimeout(groupId, memberId);
+// scheduleConsumerGroupSessionTimeout(groupId, memberId,
member.classicMemberSessionTimeout());
+
+ byte[] assignment = ConsumerProtocol.serializeAssignment(
+ new
ConsumerPartitionAssignor.Assignment(toTopicPartitionList(member.assignedPartitions(),
metadataImage.topics())),
+ deserializeProtocolVersion(member.classicMemberMetadata().get())
+ ).array();
+
+ responseFuture.complete(new SyncGroupResponseData()
+ .setProtocolType(request.protocolType())
+ .setProtocolName(request.protocolName())
+ .setAssignment(assignment)
+ .setErrorCode(Errors.NONE.code()));
Review Comment:
This is incorrect. We also need to send the response only when the "append
future" is completed. The reason is that we potentially access uncommitted
state here.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync(
return EMPTY_RESULT;
}
+ /**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group The ConsumerGroup.
+ * @param context The request context.
+ * @param request The actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains records to append.
+ */
+ private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ SyncGroupRequestData request,
+ CompletableFuture<SyncGroupResponseData> responseFuture
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ String groupId = request.groupId();
+ String memberId = request.memberId();
+ String instanceId = request.groupInstanceId();
+
+ ConsumerGroupMember member;
+ if (instanceId == null) {
+ member = group.getOrMaybeCreateMember(request.memberId(), false);
+ } else {
+ member = group.staticMember(instanceId);
+ if (member == null) {
+ throw new UnknownMemberIdException(
+ String.format("Member with instance id %s is not a member
of group %s.", instanceId, groupId)
+ );
+ }
+ throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+ }
+
+ throwIfMemberDoesNotUseClassicProtocol(member);
+ throwIfGenerationIdOrProtocolUnmatched(
+ group,
+ member,
+ request.generationId(),
+ request.protocolType(),
+ request.protocolName()
+ );
+
Review Comment:
Hum... I wonder if we could rely on the member epoch to do this. If the
member epoch is smaller than the group epoch and the member is not in unrevoked
partitions state, we could return rebalance in progress. If the member epoch is
smaller than the group epoch, it means that the member must rebalance to catch
up. However, if the member is already in unrevoked partitions state, it means
that it has already started a rebalance and it must complete it to revoke
partitions. It will automatically start another one after revoking the
partitions. Would something like this work?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync(
return EMPTY_RESULT;
}
+ /**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group The ConsumerGroup.
+ * @param context The request context.
+ * @param request The actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains records to append.
+ */
+ private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ SyncGroupRequestData request,
+ CompletableFuture<SyncGroupResponseData> responseFuture
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
Review Comment:
nit: Do we ever throw `GroupIdNotFoundException`? It would be great if we
could add the remaining ones too.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported(
}
}
+ /**
+ * Validates if the consumer group member uses the classic protocol.
+ *
+ * @param member The ConsumerGroupMember.
+ */
+ private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember
member) {
+ if (!member.useClassicProtocol()) {
+ throw new UnknownMemberIdException(
+ String.format("Member %s does not use the classic protocol.",
member.memberId())
+ );
+ }
+ }
+
+ /**
+ * Validates if the generation id and the protocol type from the request
match those of the consumer group.
+ *
+ * @param group The ConsumerGroup.
+ * @param member The ConsumerGroupMember.
+ * @param requestGenerationId The generation id from the request.
+ * @param requestProtocolType The protocol type from the request.
+ * @param requestProtocolName The protocol name from the request.
+ */
+ private void throwIfGenerationIdOrProtocolUnmatched(
+ ConsumerGroup group,
+ ConsumerGroupMember member,
+ int requestGenerationId,
+ String requestProtocolType,
+ String requestProtocolName
+ ) {
+ if (member.memberEpoch() != requestGenerationId) {
+ throw Errors.ILLEGAL_GENERATION.exception(
+ String.format("The request generation id %s is not equal to
the group epoch %d from the consumer group %s.",
+ requestGenerationId, group.groupEpoch(), group.groupId())
+ );
+ } else if (!group.supportsClassicProtocols(requestProtocolType,
Collections.singleton(requestProtocolName))) {
+ throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("The member
protocol is not supported.");
+ }
Review Comment:
This is incorrect. In the JoinGroup response, we set the protocol name to
`protocols.iterator().next().name()`. Here we should validate that we get it
back.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync(
return EMPTY_RESULT;
}
+ /**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group The ConsumerGroup.
+ * @param context The request context.
+ * @param request The actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains records to append.
+ */
+ private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ SyncGroupRequestData request,
+ CompletableFuture<SyncGroupResponseData> responseFuture
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ String groupId = request.groupId();
+ String memberId = request.memberId();
+ String instanceId = request.groupInstanceId();
+
+ ConsumerGroupMember member;
+ if (instanceId == null) {
+ member = group.getOrMaybeCreateMember(request.memberId(), false);
+ } else {
+ member = group.staticMember(instanceId);
+ if (member == null) {
+ throw new UnknownMemberIdException(
+ String.format("Member with instance id %s is not a member
of group %s.", instanceId, groupId)
+ );
+ }
+ throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+ }
+
+ throwIfMemberDoesNotUseClassicProtocol(member);
+ throwIfGenerationIdOrProtocolUnmatched(
+ group,
+ member,
+ request.generationId(),
+ request.protocolType(),
+ request.protocolName()
+ );
+
+ cancelConsumerGroupSyncTimeout(groupId, memberId);
+// scheduleConsumerGroupSessionTimeout(groupId, memberId,
member.classicMemberSessionTimeout());
+
+ byte[] assignment = ConsumerProtocol.serializeAssignment(
+ new
ConsumerPartitionAssignor.Assignment(toTopicPartitionList(member.assignedPartitions(),
metadataImage.topics())),
Review Comment:
Like in the join group api, I think that we could avoid the intermediate
data structures here too.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync(
return EMPTY_RESULT;
}
+ /**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group The ConsumerGroup.
+ * @param context The request context.
+ * @param request The actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains records to append.
+ */
+ private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ SyncGroupRequestData request,
+ CompletableFuture<SyncGroupResponseData> responseFuture
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ String groupId = request.groupId();
+ String memberId = request.memberId();
+ String instanceId = request.groupInstanceId();
+
+ ConsumerGroupMember member;
+ if (instanceId == null) {
+ member = group.getOrMaybeCreateMember(request.memberId(), false);
+ } else {
+ member = group.staticMember(instanceId);
+ if (member == null) {
+ throw new UnknownMemberIdException(
+ String.format("Member with instance id %s is not a member
of group %s.", instanceId, groupId)
+ );
+ }
+ throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+ }
+
+ throwIfMemberDoesNotUseClassicProtocol(member);
+ throwIfGenerationIdOrProtocolUnmatched(
+ group,
+ member,
+ request.generationId(),
+ request.protocolType(),
+ request.protocolName()
+ );
+
+ cancelConsumerGroupSyncTimeout(groupId, memberId);
+// scheduleConsumerGroupSessionTimeout(groupId, memberId,
member.classicMemberSessionTimeout());
+
+ byte[] assignment = ConsumerProtocol.serializeAssignment(
+ new
ConsumerPartitionAssignor.Assignment(toTopicPartitionList(member.assignedPartitions(),
metadataImage.topics())),
+ deserializeProtocolVersion(member.classicMemberMetadata().get())
+ ).array();
+
+ responseFuture.complete(new SyncGroupResponseData()
+ .setProtocolType(request.protocolType())
+ .setProtocolName(request.protocolName())
+ .setAssignment(assignment)
+ .setErrorCode(Errors.NONE.code()));
Review Comment:
nit: The error code is 0 by default so we don't need to set it.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3797,32 +3874,51 @@ private CoordinatorResult<Void, Record>
updateStaticMemberThenRebalanceOrComplet
* @param request The actual SyncGroup request.
* @param responseFuture The sync group response future.
*
- * @return The result that contains records to append if the group
metadata manager received assignments.
+ * @return The result that contains records to append.
*/
public CoordinatorResult<Void, Record> classicGroupSync(
RequestContext context,
SyncGroupRequestData request,
CompletableFuture<SyncGroupResponseData> responseFuture
) throws UnknownMemberIdException, GroupIdNotFoundException {
- String groupId = request.groupId();
- String memberId = request.memberId();
- ClassicGroup group;
- try {
- group = getOrMaybeCreateClassicGroup(groupId, false);
- } catch (Throwable t) {
+ Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+ if (group == null || group.isEmpty()) {
responseFuture.complete(new SyncGroupResponseData()
- .setErrorCode(Errors.forException(t).code())
- );
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
return EMPTY_RESULT;
}
+ if (group.type() == CLASSIC) {
+ return classicGroupSyncToClassicGroup((ClassicGroup) group,
context, request, responseFuture);
+ } else {
+ return classicGroupSyncToConsumerGroup((ConsumerGroup) group,
context, request, responseFuture);
+ }
+ }
+
+ /**
+ * Handle a SyncGroupRequest to a ClassicGroup.
+ *
+ * @param group The ClassicGroup.
+ * @param context The request context.
+ * @param request The actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains records to append if the group
metadata manager received assignments.
+ */
+ public CoordinatorResult<Void, Record> classicGroupSyncToClassicGroup(
Review Comment:
nit: Could it be private?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -11885,16 +11945,352 @@ public void
testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th
assertEquals(expectedMember3.state(),
group.getOrMaybeCreateMember(memberId1, false).state());
joinResult3.appendFuture.complete(null);
+ JoinGroupResponseData joinResponse3 = joinResult3.joinFuture.get();
assertEquals(
new JoinGroupResponseData()
.setMemberId(memberId1)
.setGenerationId(11)
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setProtocolName("range"),
- joinResult3.joinFuture.get()
+ joinResponse3
);
context.assertSessionTimeout(groupId, memberId1,
request3.sessionTimeoutMs());
context.assertSyncTimeout(groupId, memberId1,
request3.rebalanceTimeoutMs());
+
+ // Member 1 sends sync request to get the assigned partitions.
+ testClassicGroupSyncToConsumerGroup(
+ context,
+ groupId,
+ joinResponse3.memberId(),
+ joinResponse3.generationId(),
+ joinResponse3.protocolName(),
+ joinResponse3.protocolType(),
+ Arrays.asList(
+ new TopicPartition(fooTopicName, 0),
+ new TopicPartition(fooTopicName, 1),
+ new TopicPartition(zarTopicName, 0)
+ )
+ );
+ }
+
+ @Test
+ public void
testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() throws
Exception {
+ String groupId = "group-id";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ for (short version =
ConsumerProtocolAssignment.LOWEST_SUPPORTED_VERSION; version <=
ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION; version++) {
+ List<TopicPartition> topicPartitions = Arrays.asList(
+ new TopicPartition(fooTopicName, 0),
+ new TopicPartition(fooTopicName, 1),
+ new TopicPartition(fooTopicName, 2),
+ new TopicPartition(barTopicName, 0),
+ new TopicPartition(barTopicName, 1)
+ );
+
+ List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols =
Collections.singletonList(
+ new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+ new ConsumerPartitionAssignor.Subscription(
+ Arrays.asList(fooTopicName, barTopicName),
+ null,
+ topicPartitions
+ ),
+ version
+ )))
+ );
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setClassicMemberMetadata(
+ new
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+// .setClassicMemberSessionTimeout(5000)
Review Comment:
Don't forget this one.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -11885,16 +11945,352 @@ public void
testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th
assertEquals(expectedMember3.state(),
group.getOrMaybeCreateMember(memberId1, false).state());
joinResult3.appendFuture.complete(null);
+ JoinGroupResponseData joinResponse3 = joinResult3.joinFuture.get();
assertEquals(
new JoinGroupResponseData()
.setMemberId(memberId1)
.setGenerationId(11)
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setProtocolName("range"),
- joinResult3.joinFuture.get()
+ joinResponse3
);
context.assertSessionTimeout(groupId, memberId1,
request3.sessionTimeoutMs());
context.assertSyncTimeout(groupId, memberId1,
request3.rebalanceTimeoutMs());
+
+ // Member 1 sends sync request to get the assigned partitions.
+ testClassicGroupSyncToConsumerGroup(
+ context,
+ groupId,
+ joinResponse3.memberId(),
+ joinResponse3.generationId(),
+ joinResponse3.protocolName(),
+ joinResponse3.protocolType(),
+ Arrays.asList(
+ new TopicPartition(fooTopicName, 0),
+ new TopicPartition(fooTopicName, 1),
+ new TopicPartition(zarTopicName, 0)
+ )
+ );
+ }
+
+ @Test
+ public void
testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() throws
Exception {
+ String groupId = "group-id";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ for (short version =
ConsumerProtocolAssignment.LOWEST_SUPPORTED_VERSION; version <=
ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION; version++) {
+ List<TopicPartition> topicPartitions = Arrays.asList(
+ new TopicPartition(fooTopicName, 0),
+ new TopicPartition(fooTopicName, 1),
+ new TopicPartition(fooTopicName, 2),
+ new TopicPartition(barTopicName, 0),
+ new TopicPartition(barTopicName, 1)
+ );
+
+ List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols =
Collections.singletonList(
+ new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+ new ConsumerPartitionAssignor.Subscription(
+ Arrays.asList(fooTopicName, barTopicName),
+ null,
+ topicPartitions
+ ),
+ version
+ )))
+ );
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setClassicMemberMetadata(
+ new
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+// .setClassicMemberSessionTimeout(5000)
+ .setSupportedProtocols(protocols)
+ )
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .build();
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .build();
+
+ // Consumer group with two members.
+ // Member 1 uses the classic protocol and member 2 uses the
consumer protocol.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+ .withAssignors(Collections.singletonList(new
MockPartitionAssignor("range")))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(member1)
+ .withMember(member2)
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ testClassicGroupSyncToConsumerGroup(
+ context,
+ groupId,
+ memberId1,
+ 10,
+ "range",
+ ConsumerProtocol.PROTOCOL_TYPE,
+ topicPartitions,
+ version
+ );
+ }
+ }
+
+ @Test
+ public void testClassicGroupSyncToConsumerGroupWithUnknownMemberId()
throws Exception {
+ String groupId = "group-id";
+ String memberId = Uuid.randomUuid().toString();
+
+ // Consumer group with a member that doesn't use the classic protocol.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+ .withAssignors(Collections.singletonList(new
MockPartitionAssignor("range")))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .build()))
+ .build();
+
+ // Request with unknown member id.
+ assertThrows(UnknownMemberIdException.class, () ->
context.sendClassicGroupSync(
+ new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(Uuid.randomUuid().toString())
+ .withGenerationId(10)
+ .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+ .withProtocolType("range")
+ .build())
+ );
+
+ // Request with unknown instance id.
+ assertThrows(UnknownMemberIdException.class, () ->
context.sendClassicGroupSync(
+ new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(memberId)
+ .withGroupInstanceId("unknown-instance-id")
+ .withGenerationId(10)
+ .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+ .withProtocolType("range")
+ .build())
+ );
+
+ // Request with member id that doesn't use the classic protocol.
+ assertThrows(UnknownMemberIdException.class, () ->
context.sendClassicGroupSync(
+ new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(memberId)
+ .withGenerationId(10)
+ .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+ .withProtocolType("range")
+ .build())
+ );
+ }
+
+ @Test
+ public void testClassicGroupSyncToConsumerGroupWithFencedInstanceId()
throws Exception {
+ String groupId = "group-id";
+ String memberId = Uuid.randomUuid().toString();
+ String instanceId = "instance-id";
+
+ // Consumer group with a static member.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+ .withAssignors(Collections.singletonList(new
MockPartitionAssignor("range")))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setInstanceId(instanceId)
+ .build()))
+ .build();
+
+ assertThrows(FencedInstanceIdException.class, () ->
context.sendClassicGroupSync(
+ new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(Uuid.randomUuid().toString())
+ .withGroupInstanceId(instanceId)
+ .withGenerationId(10)
+ .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+ .withProtocolType("range")
+ .build())
+ );
+ }
+
+ @Test
+ public void
testClassicGroupSyncToConsumerGroupWithInconsistentGroupProtocol() throws
Exception {
+ String groupId = "group-id";
+ String memberId = Uuid.randomUuid().toString();
+
+ List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols =
Collections.singletonList(
+ new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+ new ConsumerPartitionAssignor.Subscription(
+ Arrays.asList("foo"),
+ null,
+ Collections.emptyList()
+ )
+ )))
+ );
+
+ // Consumer group with a static member.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+ .withAssignors(Collections.singletonList(new
MockPartitionAssignor("range")))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setClassicMemberMetadata(
+ new
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+// .setClassicMemberSessionTimeout(5000)
+ .setSupportedProtocols(protocols)
+ )
+ .setMemberEpoch(10)
+ .build()))
+ .build();
+
+ assertThrows(InconsistentGroupProtocolException.class, () ->
context.sendClassicGroupSync(
+ new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(memberId)
+ .withGenerationId(10)
+ .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+ .withProtocolType("roundrobin")
+ .build())
+ );
+
+ assertThrows(InconsistentGroupProtocolException.class, () ->
context.sendClassicGroupSync(
+ new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(memberId)
+ .withGenerationId(10)
+ .withProtocolName("connect")
+ .withProtocolType("range")
+ .build())
+ );
+ }
+
+ @Test
+ public void testClassicGroupSyncToConsumerGroupWithIllegalGeneration()
throws Exception {
+ String groupId = "group-id";
+ String memberId = Uuid.randomUuid().toString();
+
+ List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols =
Collections.singletonList(
+ new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+ new ConsumerPartitionAssignor.Subscription(
+ Arrays.asList("foo"),
+ null,
+ Collections.emptyList()
+ )
+ )))
+ );
+
+ // Consumer group with a static member.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+ .withAssignors(Collections.singletonList(new
MockPartitionAssignor("range")))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setClassicMemberMetadata(
+ new
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+// .setClassicMemberSessionTimeout(5000)
+ .setSupportedProtocols(protocols)
+ )
+ .setMemberEpoch(10)
+ .build()))
+ .build();
+
+ assertThrows(IllegalGenerationException.class, () ->
context.sendClassicGroupSync(
+ new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(memberId)
+ .withGenerationId(9)
+ .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+ .withProtocolType("range")
+ .build())
+ );
+ }
+
+ private void testClassicGroupSyncToConsumerGroup(
+ GroupMetadataManagerTestContext context,
Review Comment:
Should we move this method and the next one to the
`GroupMetadataManagerTestContext`? Having the context as the first argument
usually signals that the methods should be in the context itself.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]