dajac commented on code in PR #17286:
URL: https://github.com/apache/kafka/pull/17286#discussion_r1778129613
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2727,33 +2782,39 @@ private <T> CoordinatorResult<T, CoordinatorRecord>
consumerGroupFenceMember(
ConsumerGroupMember member,
T response
) {
- if (validateOnlineDowngrade(group, member.memberId())) {
- return convertToClassicGroup(group, member.memberId(), response);
- } else {
- List<CoordinatorRecord> records = new ArrayList<>();
- removeMember(records, group.groupId(), member.memberId());
+ List<CoordinatorRecord> records = new ArrayList<>();
+ removeMember(records, group.groupId(), member.memberId());
- // We update the subscription metadata without the leaving member.
- Map<String, TopicMetadata> subscriptionMetadata =
group.computeSubscriptionMetadata(
- group.computeSubscribedTopicNames(member, null),
- metadataImage.topics(),
- metadataImage.cluster()
- );
+ // We update the subscription metadata without the leaving member.
+ Map<String, TopicMetadata> subscriptionMetadata =
group.computeSubscriptionMetadata(
+ group.computeSubscribedTopicNames(member, null),
+ metadataImage.topics(),
+ metadataImage.cluster()
+ );
- if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
- log.info("[GroupId {}] Computed new subscription metadata:
{}.",
- group.groupId(), subscriptionMetadata);
-
records.add(newConsumerGroupSubscriptionMetadataRecord(group.groupId(),
subscriptionMetadata));
- }
+ if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+ log.info("[GroupId {}] Computed new subscription metadata: {}.",
+ group.groupId(), subscriptionMetadata);
+
records.add(newConsumerGroupSubscriptionMetadataRecord(group.groupId(),
subscriptionMetadata));
+ }
- // We bump the group epoch.
+ // We bump the group epoch if the group doesn't need a downgrade,
+ // or the rebalance will be triggered after the downgrade conversion.
+ if (!validateOnlineDowngradeWithFencedMember(group,
member.memberId())) {
int groupEpoch = group.groupEpoch() + 1;
records.add(newConsumerGroupEpochRecord(group.groupId(),
groupEpoch));
+ }
- cancelTimers(group.groupId(), member.memberId());
+ cancelTimers(group.groupId(), member.memberId());
- return new CoordinatorResult<>(records, response);
- }
+ CompletableFuture<Void> appendFuture = new CompletableFuture<>();
+ appendFuture.whenComplete((__, t) -> {
+ if (t == null) {
+ scheduleConsumerGroupDowngradeTimeout(group, true);
Review Comment:
I have a similar thought here. I wonder if we should just schedule it at
L2806 in an else branch. We already know there that we need to downgrade the
group. What do you think?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -3125,25 +3128,25 @@ public void testSessionTimeoutExpiration() {
List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(45000 + 1);
// Verify the expired timeout.
+ assertEquals(1, timeouts.size());
Review Comment:
Is there a reason why we need to change this code? Ah, it may be because of
the append future. Is it the case?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10944,15 +10947,225 @@ public void
testLastConsumerProtocolMemberLeavingConsumerGroup() {
ClassicGroup classicGroup =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
- // Simulate a failed write to the log.
- result.appendFuture().completeExceptionally(new
NotLeaderOrFollowerException());
+ // Simulate a failed conversion.
+ downgradeTimeout.result.appendFuture().completeExceptionally(new
NotLeaderOrFollowerException());
context.rollback();
// The group is reverted back to the consumer group.
assertEquals(consumerGroup,
context.groupMetadataManager.consumerGroup(groupId));
verify(context.metrics,
times(1)).onClassicGroupStateTransition(PREPARING_REBALANCE, null);
}
+ @Test
+ public void
testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMember() throws
ExecutionException, InterruptedException {
+ String groupId = "group-id";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ String instanceId = "instance-id";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols =
Collections.singletonList(
+ new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ Arrays.asList(fooTopicName, barTopicName),
+ null,
+ Arrays.asList(
+ new TopicPartition(fooTopicName, 0),
+ new TopicPartition(fooTopicName, 1),
+ new TopicPartition(fooTopicName, 2),
+ new TopicPartition(barTopicName, 0),
+ new TopicPartition(barTopicName, 1)
+ )
+ ))))
+ );
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setClassicMemberMetadata(
+ new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(5000)
+ .setSupportedProtocols(protocols)
+ )
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .build();
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setInstanceId(instanceId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(Collections.singletonList("foo"))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .build();
+
+ // Consumer group with two members.
+ // Member 1 uses the classic protocol and static member 2 uses the
consumer protocol.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+ .withConsumerGroupAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 2)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(member1)
+ .withMember(member2)
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .withAssignmentEpoch(10))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
6));
+ put(barTopicName, new TopicMetadata(barTopicId, barTopicName,
2));
+ }
+ }));
+
+ context.commit();
+
+ // A new member using classic protocol with the same instance id
joins, scheduling the downgrade.
+ JoinGroupRequestData joinRequest = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withGroupInstanceId(instanceId)
+ .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+ GroupMetadataManagerTestContext.JoinResult result =
context.sendClassicGroupJoin(joinRequest);
+ result.appendFuture.complete(null);
+ memberId2 = result.joinFuture.get().memberId();
+
+ ExpiredTimeout<Void, CoordinatorRecord> downgradeTimeout =
context.sleep(0).get(0);
+ assertEquals(consumerGroupDowngradeKey(groupId), downgradeTimeout.key);
+
+ byte[] assignment1 =
Utils.toArray(ConsumerProtocol.serializeAssignment(new
ConsumerPartitionAssignor.Assignment(Arrays.asList(
+ new TopicPartition(fooTopicName, 0),
+ new TopicPartition(fooTopicName, 1),
+ new TopicPartition(fooTopicName, 2),
+ new TopicPartition(barTopicName, 0),
+ new TopicPartition(barTopicName, 1)
+ ))));
+ byte[] assignment2 =
Utils.toArray(ConsumerProtocol.serializeAssignment(new
ConsumerPartitionAssignor.Assignment(Arrays.asList(
+ new TopicPartition(fooTopicName, 3),
+ new TopicPartition(fooTopicName, 4),
+ new TopicPartition(fooTopicName, 5)
+ ))));
+ Map<String, byte[]> assignments = new HashMap<>();
+ assignments.put(memberId1, assignment1);
+ assignments.put(memberId2, assignment2);
+
+ ClassicGroup expectedClassicGroup = new ClassicGroup(
+ new LogContext(),
+ groupId,
+ STABLE,
+ context.time,
+ context.metrics,
+ 10,
+ Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
+ Optional.of("range"),
+ Optional.of(memberId1),
+ Optional.of(context.time.milliseconds())
+ );
+ expectedClassicGroup.add(
+ new ClassicGroupMember(
+ memberId1,
+ Optional.ofNullable(member1.instanceId()),
+ member1.clientId(),
+ member1.clientHost(),
+ member1.rebalanceTimeoutMs(),
+ member1.classicProtocolSessionTimeout().get(),
+ ConsumerProtocol.PROTOCOL_TYPE,
+ member1.supportedJoinGroupRequestProtocols(),
+ assignment1
+ )
+ );
+ expectedClassicGroup.add(
+ new ClassicGroupMember(
+ memberId2,
+ Optional.ofNullable(member2.instanceId()),
+ DEFAULT_CLIENT_ID,
+ DEFAULT_CLIENT_ADDRESS.toString(),
+ joinRequest.rebalanceTimeoutMs(),
+ joinRequest.sessionTimeoutMs(),
+ joinRequest.protocolType(),
+ joinRequest.protocols(),
+ assignment2
+ )
+ );
+
+ List<CoordinatorRecord> expectedRecords = Arrays.asList(
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId2),
+
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId),
+
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId),
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId),
+
+
GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments, MetadataVersion.latestTesting())
+ );
+
+ assertEquals(expectedRecords.size(),
downgradeTimeout.result.records().size());
+ assertUnorderedListEquals(expectedRecords.subList(0, 2),
downgradeTimeout.result.records().subList(0, 2));
+ assertUnorderedListEquals(expectedRecords.subList(2, 4),
downgradeTimeout.result.records().subList(2, 4));
+ assertRecordEquals(expectedRecords.get(4),
downgradeTimeout.result.records().get(4));
+ assertUnorderedListEquals(expectedRecords.subList(5, 7),
downgradeTimeout.result.records().subList(5, 7));
+ assertRecordsEquals(expectedRecords.subList(7, 9),
downgradeTimeout.result.records().subList(7, 9));
+
+ // Leader can be either member 1 or member 2.
+ try {
+ assertRecordEquals(expectedRecords.get(9),
downgradeTimeout.result.records().get(9));
+ } catch (AssertionFailedError e) {
+ expectedClassicGroup.setLeaderId(Optional.of(memberId2));
+ assertRecordEquals(
+
GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments, MetadataVersion.latestTesting()),
+ downgradeTimeout.result.records().get(9)
+ );
+ }
+
+ verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
null);
+ verify(context.metrics, times(1)).onClassicGroupStateTransition(null,
STABLE);
+
+ // The new classic member 1 has a heartbeat timeout.
+ ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
+ classicGroupHeartbeatKey(groupId, memberId1)
+ );
+ assertNotNull(heartbeatTimeout);
+
+ // No rebalance is triggered.
+ ClassicGroup classicGroup =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
+ assertTrue(classicGroup.isInState(STABLE));
+ }
+
Review Comment:
I wonder if we should also test the following case:
* Last member using the consumer protocol leaves. We schedule the downgrade
task;
* A new member using the consumer protocol joins before the downgrade task
is executed;
* The downgrade task is executed and should be a no-op.
What do you think?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2049,6 +2099,11 @@ private CoordinatorResult<Void, CoordinatorRecord>
classicGroupJoinToConsumerGro
scheduleConsumerGroupSyncTimeout(groupId, response.memberId(),
request.rebalanceTimeoutMs());
responseFuture.complete(response);
+
+ // Maybe downgrade the consumer group if the last member using
the
+ // consumer protocol is replaced by the joining member. No
rebalance
+ // is needed after the replacement.
+ scheduleConsumerGroupDowngradeTimeout(group, false);
Review Comment:
I wonder if we should rather call this one out side of the `appendFuture`.
My concern is that we call `validateOnlineDowngrade` in
`scheduleConsumerGroupDowngradeTimeout` and it accesses the uncommitted state
of the group while we actually complete the write operation. This may lead to
unexpected results.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3128,6 +3189,27 @@ private void cancelConsumerGroupSyncTimeout(
timer.cancel(consumerGroupSyncKey(groupId, memberId));
}
+ /**
+ * Maybe schedules the downgrade timeout for the consumer group.
+ *
+ * @param consumerGroup The group to downgrade.
+ * @param needsRebalance The boolean indicating whether a rebalance
should be triggered after the conversion.
+ */
+ private void scheduleConsumerGroupDowngradeTimeout(
+ ConsumerGroup consumerGroup,
+ boolean needsRebalance
+ ) {
+ if (validateOnlineDowngrade(consumerGroup)) {
Review Comment:
If we do the changes that I suggested, I think that we could remove this
check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]