dajac commented on code in PR #16573:
URL: https://github.com/apache/kafka/pull/16573#discussion_r1696564483
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2388,6 +3138,82 @@ public
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
}
}
+ /**
+ * Handles a ShareGroupHeartbeat request.
+ *
+ * @param context The request context.
+ * @param request The actual ShareGroupHeartbeat request.
+ *
+ * @return A Result containing the ShareGroupHeartbeat response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<ShareGroupHeartbeatResponseData,
CoordinatorRecord> shareGroupHeartbeat(
+ RequestContext context,
+ ShareGroupHeartbeatRequestData request
+ ) throws ApiException {
+ throwIfShareGroupHeartbeatRequestIsInvalid(request);
+
+ if (request.memberEpoch() ==
ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH) {
+ // -1 means that the member wants to leave the group.
+ return shareGroupLeave(
+ request.groupId(),
+ request.memberId(),
+ request.memberEpoch());
+ }
+ // Otherwise, it is a regular heartbeat.
+ return shareGroupHeartbeat(
+ request.groupId(),
+ request.memberId(),
+ request.memberEpoch(),
+ request.rackId(),
+ context.clientId(),
+ context.clientAddress.toString(),
+ request.subscribedTopicNames());
+ }
+
+ private void replay(CoordinatorRecord record) {
Review Comment:
This method really annoys me because the concept of replay is linked to
persisted records and this is not what it is here. It also duplicate some of
the replay logic.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1128,6 +1401,26 @@ private void throwIfMemberEpochIsInvalid(
}
}
+ /**
+ * Validates the member epoch provided in the heartbeat request.
+ *
+ * @param member The share group member.
+ * @param receivedMemberEpoch The member epoch.
+ *
+ * @throws FencedMemberEpochException if the provided epoch is ahead or
behind the epoch known
+ * by this coordinator.
+ */
+ private void throwIfShareGroupMemberEpochIsInvalid(
+ ShareGroupMember member,
+ int receivedMemberEpoch
+ ) {
+ if (receivedMemberEpoch > member.memberEpoch()) {
Review Comment:
My understanding is that the member epoch is never persisted. Is it correct?
If it is, it means that after a fail over of the group coordinator, all the
members will be fenced because the member epoch here will be back to zero. Is
it the case?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1702,65 +2012,218 @@ private CoordinatorResult<Void, CoordinatorRecord>
classicGroupJoinToConsumerGro
}
/**
- * Gets or subscribes a new dynamic consumer group member.
+ * Handles a ShareGroupHeartbeat request.
*
- * @param group The consumer group.
- * @param memberId The member id.
- * @param memberEpoch The member epoch.
- * @param ownedTopicPartitions The owned partitions reported by the
member.
- * @param createIfNotExists Whether the member should be created or
not.
- * @param useClassicProtocol Whether the member uses the classic
protocol.
+ * @param groupId The group id from the request.
+ * @param memberId The member id from the request.
+ * @param memberEpoch The member epoch from the request.
+ * @param rackId The rack id from the request or null.
+ * @param clientId The client id.
+ * @param clientHost The client host.
+ * @param subscribedTopicNames The list of subscribed topic names from
the request or null.
*
- * @return The existing consumer group member or a new one.
+ * @return A Result containing the ShareGroupHeartbeat response and
+ * a list of records to update the state machine.
*/
- private ConsumerGroupMember getOrMaybeSubscribeDynamicConsumerGroupMember(
- ConsumerGroup group,
+ private CoordinatorResult<ShareGroupHeartbeatResponseData,
CoordinatorRecord> shareGroupHeartbeat(
+ String groupId,
String memberId,
int memberEpoch,
- List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions,
- boolean createIfNotExists,
- boolean useClassicProtocol
- ) {
- ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId,
createIfNotExists);
- if (!useClassicProtocol) {
- throwIfMemberEpochIsInvalid(member, memberEpoch,
ownedTopicPartitions);
- }
- if (createIfNotExists) {
- log.info("[GroupId {}] Member {} joins the consumer group using
the {} protocol.",
- group.groupId(), memberId, useClassicProtocol ? "classic" :
"consumer");
- }
- return member;
- }
+ String rackId,
+ String clientId,
+ String clientHost,
+ List<String> subscribedTopicNames
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ // The records which persists.
+ final List<CoordinatorRecord> records = new ArrayList<>();
+ // The records which are replayed immediately.
+ final List<CoordinatorRecord> replayRecords = new ArrayList<>();
- /**
- * Gets or subscribes a static consumer group member. This method also
replaces the
- * previous static member if allowed.
- *
- * @param group The consumer group.
- * @param memberId The member id.
- * @param memberEpoch The member epoch.
- * @param instanceId The instance id.
- * @param ownedTopicPartitions The owned partitions reported by the
member.
- * @param createIfNotExists Whether the member should be created or
not.
- * @param useClassicProtocol Whether the member uses the classic
protocol.
- * @param records The list to accumulate records created to
replace
- * the previous static member.
- *
- * @return The existing consumer group member or a new one.
- */
- private ConsumerGroupMember getOrMaybeSubscribeStaticConsumerGroupMember(
- ConsumerGroup group,
- String memberId,
- int memberEpoch,
- String instanceId,
- List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions,
- boolean createIfNotExists,
- boolean useClassicProtocol,
- List<CoordinatorRecord> records
- ) {
- ConsumerGroupMember existingStaticMemberOrNull =
group.staticMember(instanceId);
+ // Get or create the share group.
+ boolean createIfNotExists = memberEpoch == 0;
+ final ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId,
createIfNotExists);
+ throwIfShareGroupIsFull(group, memberId);
- if (createIfNotExists) {
+ // Get or create the member.
+ if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
+ ShareGroupMember member = getOrMaybeSubscribeShareGroupMember(
+ group,
+ memberId,
+ memberEpoch,
+ createIfNotExists
+ );
+
+ ShareGroupMember.Builder updatedMemberBuilder = new
ShareGroupMember.Builder(member);
+ // 1. Create or update the member.
+ ShareGroupMember updatedMember = updatedMemberBuilder
+ .maybeUpdateRackId(Optional.ofNullable(rackId))
+
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
+ .setClientId(clientId)
+ .setClientHost(clientHost)
+ .build();
+
+ boolean bumpGroupEpoch = hasMemberSubscriptionChanged(
+ groupId,
+ member,
+ updatedMember,
+ records
+ );
+
+ int groupEpoch = group.groupEpoch();
+ Map<String, TopicMetadata> subscriptionMetadata =
group.subscriptionMetadata();
+ SubscriptionType subscriptionType = group.subscriptionType();
+
+ if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+ // The subscription metadata is updated in two cases:
+ // 1) The member has updated its subscriptions;
+ // 2) The refresh deadline has been reached.
+ Map<String, Integer> subscribedTopicNamesMap =
group.computeSubscribedTopicNames(member, updatedMember);
+ subscriptionMetadata = group.computeSubscriptionMetadata(
+ subscribedTopicNamesMap,
+ metadataImage.topics(),
+ metadataImage.cluster()
+ );
+
+ int numMembers = group.numMembers();
+ if (!group.hasMember(updatedMember.memberId())) {
+ numMembers++;
+ }
+
+ subscriptionType = ModernGroup.subscriptionType(
+ subscribedTopicNamesMap,
+ numMembers
+ );
+
+ if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+ log.info("[GroupId {}] Computed new subscription metadata:
{}.",
+ groupId, subscriptionMetadata);
+ bumpGroupEpoch = true;
+ replayRecords.add(newGroupSubscriptionMetadataRecord(groupId,
subscriptionMetadata));
+ }
+
+ if (bumpGroupEpoch) {
+ groupEpoch += 1;
+ records.add(newGroupEpochRecord(groupId, groupEpoch, SHARE));
+ log.info("[GroupId {}] Bumped group epoch to {}.", groupId,
groupEpoch);
+ }
+
+ group.setMetadataRefreshDeadline(currentTimeMs +
shareGroupMetadataRefreshIntervalMs, groupEpoch);
+ }
+
+ // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch.
+ final int targetAssignmentEpoch;
+ final Assignment targetAssignment;
+
+ if (groupEpoch > group.assignmentEpoch()) {
+ targetAssignment = updateTargetAssignment(
+ group,
+ groupEpoch,
+ updatedMember,
+ subscriptionMetadata,
+ subscriptionType,
+ replayRecords
+ );
+ targetAssignmentEpoch = groupEpoch;
Review Comment:
My understanding is that we don't persist the target assignment. One thing
to consider is that it will trigger a re-computation of all the target
assignment on coordinator failovers. This is probably costly.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2621,26 +3497,140 @@ public void replay(
public void replay(
ConsumerGroupCurrentMemberAssignmentKey key,
ConsumerGroupCurrentMemberAssignmentValue value
+ ) {
+ replay(key, value, CONSUMER);
+ }
+
+ /**
+ * Replays ConsumerGroupCurrentMemberAssignmentKey/Value to update the
hard state of
+ * the consumer group. It updates the assignment of a member or deletes it.
+ *
+ * @param key A ConsumerGroupCurrentMemberAssignmentKey key.
+ * @param value A ConsumerGroupCurrentMemberAssignmentValue record.
+ */
+ public void replay(
+ ConsumerGroupCurrentMemberAssignmentKey key,
+ ConsumerGroupCurrentMemberAssignmentValue value,
+ GroupType groupType
Review Comment:
All theses group types passes to the replay methods are really confusing
too. It gives the impressions that the state is actually persisted but it is
not. I think that it will make the code hard to reason about in the future. We
should avoid this. Those records are for consumer groups.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1702,65 +2012,218 @@ private CoordinatorResult<Void, CoordinatorRecord>
classicGroupJoinToConsumerGro
}
/**
- * Gets or subscribes a new dynamic consumer group member.
+ * Handles a ShareGroupHeartbeat request.
*
- * @param group The consumer group.
- * @param memberId The member id.
- * @param memberEpoch The member epoch.
- * @param ownedTopicPartitions The owned partitions reported by the
member.
- * @param createIfNotExists Whether the member should be created or
not.
- * @param useClassicProtocol Whether the member uses the classic
protocol.
+ * @param groupId The group id from the request.
+ * @param memberId The member id from the request.
+ * @param memberEpoch The member epoch from the request.
+ * @param rackId The rack id from the request or null.
+ * @param clientId The client id.
+ * @param clientHost The client host.
+ * @param subscribedTopicNames The list of subscribed topic names from
the request or null.
*
- * @return The existing consumer group member or a new one.
+ * @return A Result containing the ShareGroupHeartbeat response and
+ * a list of records to update the state machine.
*/
- private ConsumerGroupMember getOrMaybeSubscribeDynamicConsumerGroupMember(
- ConsumerGroup group,
+ private CoordinatorResult<ShareGroupHeartbeatResponseData,
CoordinatorRecord> shareGroupHeartbeat(
+ String groupId,
String memberId,
int memberEpoch,
- List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions,
- boolean createIfNotExists,
- boolean useClassicProtocol
- ) {
- ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId,
createIfNotExists);
- if (!useClassicProtocol) {
- throwIfMemberEpochIsInvalid(member, memberEpoch,
ownedTopicPartitions);
- }
- if (createIfNotExists) {
- log.info("[GroupId {}] Member {} joins the consumer group using
the {} protocol.",
- group.groupId(), memberId, useClassicProtocol ? "classic" :
"consumer");
- }
- return member;
- }
+ String rackId,
+ String clientId,
+ String clientHost,
+ List<String> subscribedTopicNames
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ // The records which persists.
+ final List<CoordinatorRecord> records = new ArrayList<>();
+ // The records which are replayed immediately.
+ final List<CoordinatorRecord> replayRecords = new ArrayList<>();
- /**
- * Gets or subscribes a static consumer group member. This method also
replaces the
- * previous static member if allowed.
- *
- * @param group The consumer group.
- * @param memberId The member id.
- * @param memberEpoch The member epoch.
- * @param instanceId The instance id.
- * @param ownedTopicPartitions The owned partitions reported by the
member.
- * @param createIfNotExists Whether the member should be created or
not.
- * @param useClassicProtocol Whether the member uses the classic
protocol.
- * @param records The list to accumulate records created to
replace
- * the previous static member.
- *
- * @return The existing consumer group member or a new one.
- */
- private ConsumerGroupMember getOrMaybeSubscribeStaticConsumerGroupMember(
- ConsumerGroup group,
- String memberId,
- int memberEpoch,
- String instanceId,
- List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions,
- boolean createIfNotExists,
- boolean useClassicProtocol,
- List<CoordinatorRecord> records
- ) {
- ConsumerGroupMember existingStaticMemberOrNull =
group.staticMember(instanceId);
+ // Get or create the share group.
+ boolean createIfNotExists = memberEpoch == 0;
+ final ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId,
createIfNotExists);
+ throwIfShareGroupIsFull(group, memberId);
- if (createIfNotExists) {
+ // Get or create the member.
+ if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
+ ShareGroupMember member = getOrMaybeSubscribeShareGroupMember(
+ group,
+ memberId,
+ memberEpoch,
+ createIfNotExists
+ );
+
+ ShareGroupMember.Builder updatedMemberBuilder = new
ShareGroupMember.Builder(member);
+ // 1. Create or update the member.
+ ShareGroupMember updatedMember = updatedMemberBuilder
+ .maybeUpdateRackId(Optional.ofNullable(rackId))
+
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
+ .setClientId(clientId)
+ .setClientHost(clientHost)
+ .build();
+
+ boolean bumpGroupEpoch = hasMemberSubscriptionChanged(
+ groupId,
+ member,
+ updatedMember,
+ records
+ );
+
+ int groupEpoch = group.groupEpoch();
+ Map<String, TopicMetadata> subscriptionMetadata =
group.subscriptionMetadata();
+ SubscriptionType subscriptionType = group.subscriptionType();
+
+ if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+ // The subscription metadata is updated in two cases:
+ // 1) The member has updated its subscriptions;
+ // 2) The refresh deadline has been reached.
+ Map<String, Integer> subscribedTopicNamesMap =
group.computeSubscribedTopicNames(member, updatedMember);
+ subscriptionMetadata = group.computeSubscriptionMetadata(
+ subscribedTopicNamesMap,
+ metadataImage.topics(),
+ metadataImage.cluster()
+ );
+
+ int numMembers = group.numMembers();
+ if (!group.hasMember(updatedMember.memberId())) {
+ numMembers++;
+ }
+
+ subscriptionType = ModernGroup.subscriptionType(
+ subscribedTopicNamesMap,
+ numMembers
+ );
+
+ if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+ log.info("[GroupId {}] Computed new subscription metadata:
{}.",
+ groupId, subscriptionMetadata);
+ bumpGroupEpoch = true;
+ replayRecords.add(newGroupSubscriptionMetadataRecord(groupId,
subscriptionMetadata));
+ }
+
+ if (bumpGroupEpoch) {
+ groupEpoch += 1;
+ records.add(newGroupEpochRecord(groupId, groupEpoch, SHARE));
+ log.info("[GroupId {}] Bumped group epoch to {}.", groupId,
groupEpoch);
+ }
+
+ group.setMetadataRefreshDeadline(currentTimeMs +
shareGroupMetadataRefreshIntervalMs, groupEpoch);
+ }
+
+ // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch.
+ final int targetAssignmentEpoch;
+ final Assignment targetAssignment;
+
+ if (groupEpoch > group.assignmentEpoch()) {
+ targetAssignment = updateTargetAssignment(
+ group,
+ groupEpoch,
+ updatedMember,
+ subscriptionMetadata,
+ subscriptionType,
+ replayRecords
+ );
+ targetAssignmentEpoch = groupEpoch;
+ } else {
+ targetAssignmentEpoch = group.assignmentEpoch();
+ targetAssignment =
group.targetAssignment(updatedMember.memberId());
+ }
+
+ // 3. Reconcile the member's assignment with the target assignment if
the member is not
+ // fully reconciled yet.
+ updatedMember = maybeReconcile(
+ groupId,
+ updatedMember,
+ targetAssignmentEpoch,
+ targetAssignment,
+ replayRecords
+ );
+
+ scheduleShareGroupSessionTimeout(groupId, memberId);
+
+ // Prepare the response.
+ ShareGroupHeartbeatResponseData response = new
ShareGroupHeartbeatResponseData()
+ .setMemberId(updatedMember.memberId())
+ .setMemberEpoch(updatedMember.memberEpoch())
+ .setHeartbeatIntervalMs(shareGroupHeartbeatIntervalMs);
+
+ // The assignment is only provided in the following cases:
+ // 1. The member just joined or rejoined to group (epoch equals to
zero);
+ // 2. The member's assignment has been updated.
+ if (memberEpoch == 0 || hasAssignedPartitionsChanged(member,
updatedMember)) {
+
response.setAssignment(createShareGroupResponseAssignment(updatedMember));
+ }
+
+ /*
+ For a consumer group, below updates would occur as a result of
applying (replaying) a member
+ or group persisted record. Because a share group doesn't persist
additional records, hence we
+ need to replay the generated records here.
+ */
+ replayRecords.forEach(this::replay);
+ return new CoordinatorResult<>(records, response);
Review Comment:
I believe that this does not fully work as you expect. For instance, if you
have records in `replayRecords` that you replay but without any records in
`records`, the changes may be rolled back if the next write operation executed
by the coordinator fails. This happens because the coordinator runtime does not
know about those changes in the timeline data structures and it will revert to
the last written offset which is computed based on the previous write
operation. This could lead to tricky state inconsistencies. I am not sure if it
matters in this particular case but it is for sure confusing.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1967,6 +2523,60 @@ private Assignment updateTargetAssignment(
}
}
+ /**
+ * Updates the target assignment according to the updated member and
subscription metadata.
+ *
+ * @param group The ShareGroup.
+ * @param groupEpoch The group epoch.
+ * @param updatedMember The updated member.
+ * @param subscriptionMetadata The subscription metadata.
+ * @param subscriptionType The group subscription type.
+ * @param records The list to accumulate any new records.
+ * @return The new target assignment.
+ */
+ private Assignment updateTargetAssignment(
Review Comment:
This one looks very similar to the existing one. Would it be possible to
reuse it? We could perhaps parameterize the type of the member. I am not sure
whether it works or not.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2080,6 +2715,55 @@ private <T> CoordinatorResult<T, CoordinatorRecord>
consumerGroupFenceMember(
}
}
+ /**
+ * Removes a member from a share group.
+ *
+ * @param group The group.
+ * @param member The member.
+ *
+ * @return A list of records to be applied to the state.
+ */
+ private <T> CoordinatorResult<T, CoordinatorRecord> shareGroupFenceMember(
+ ShareGroup group,
+ ShareGroupMember member,
+ T response
+ ) {
+ List<CoordinatorRecord> records = new ArrayList<>();
+ List<CoordinatorRecord> replayRecords = new ArrayList<>();
+
+ replayRecords.add(newCurrentAssignmentTombstoneRecord(group.groupId(),
member.memberId()));
+ replayRecords.add(newTargetAssignmentTombstoneRecord(group.groupId(),
member.memberId()));
Review Comment:
This does not seem necessary as we could rely on the persisted record to
delete the entire member. Could we?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]