apoorvmittal10 commented on code in PR #16573:
URL: https://github.com/apache/kafka/pull/16573#discussion_r1696643858
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1702,65 +2012,218 @@ private CoordinatorResult<Void, CoordinatorRecord>
classicGroupJoinToConsumerGro
}
/**
- * Gets or subscribes a new dynamic consumer group member.
+ * Handles a ShareGroupHeartbeat request.
*
- * @param group The consumer group.
- * @param memberId The member id.
- * @param memberEpoch The member epoch.
- * @param ownedTopicPartitions The owned partitions reported by the
member.
- * @param createIfNotExists Whether the member should be created or
not.
- * @param useClassicProtocol Whether the member uses the classic
protocol.
+ * @param groupId The group id from the request.
+ * @param memberId The member id from the request.
+ * @param memberEpoch The member epoch from the request.
+ * @param rackId The rack id from the request or null.
+ * @param clientId The client id.
+ * @param clientHost The client host.
+ * @param subscribedTopicNames The list of subscribed topic names from
the request or null.
*
- * @return The existing consumer group member or a new one.
+ * @return A Result containing the ShareGroupHeartbeat response and
+ * a list of records to update the state machine.
*/
- private ConsumerGroupMember getOrMaybeSubscribeDynamicConsumerGroupMember(
- ConsumerGroup group,
+ private CoordinatorResult<ShareGroupHeartbeatResponseData,
CoordinatorRecord> shareGroupHeartbeat(
+ String groupId,
String memberId,
int memberEpoch,
- List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions,
- boolean createIfNotExists,
- boolean useClassicProtocol
- ) {
- ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId,
createIfNotExists);
- if (!useClassicProtocol) {
- throwIfMemberEpochIsInvalid(member, memberEpoch,
ownedTopicPartitions);
- }
- if (createIfNotExists) {
- log.info("[GroupId {}] Member {} joins the consumer group using
the {} protocol.",
- group.groupId(), memberId, useClassicProtocol ? "classic" :
"consumer");
- }
- return member;
- }
+ String rackId,
+ String clientId,
+ String clientHost,
+ List<String> subscribedTopicNames
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ // The records which persists.
+ final List<CoordinatorRecord> records = new ArrayList<>();
+ // The records which are replayed immediately.
+ final List<CoordinatorRecord> replayRecords = new ArrayList<>();
- /**
- * Gets or subscribes a static consumer group member. This method also
replaces the
- * previous static member if allowed.
- *
- * @param group The consumer group.
- * @param memberId The member id.
- * @param memberEpoch The member epoch.
- * @param instanceId The instance id.
- * @param ownedTopicPartitions The owned partitions reported by the
member.
- * @param createIfNotExists Whether the member should be created or
not.
- * @param useClassicProtocol Whether the member uses the classic
protocol.
- * @param records The list to accumulate records created to
replace
- * the previous static member.
- *
- * @return The existing consumer group member or a new one.
- */
- private ConsumerGroupMember getOrMaybeSubscribeStaticConsumerGroupMember(
- ConsumerGroup group,
- String memberId,
- int memberEpoch,
- String instanceId,
- List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions,
- boolean createIfNotExists,
- boolean useClassicProtocol,
- List<CoordinatorRecord> records
- ) {
- ConsumerGroupMember existingStaticMemberOrNull =
group.staticMember(instanceId);
+ // Get or create the share group.
+ boolean createIfNotExists = memberEpoch == 0;
+ final ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId,
createIfNotExists);
+ throwIfShareGroupIsFull(group, memberId);
- if (createIfNotExists) {
+ // Get or create the member.
+ if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
+ ShareGroupMember member = getOrMaybeSubscribeShareGroupMember(
+ group,
+ memberId,
+ memberEpoch,
+ createIfNotExists
+ );
+
+ ShareGroupMember.Builder updatedMemberBuilder = new
ShareGroupMember.Builder(member);
+ // 1. Create or update the member.
+ ShareGroupMember updatedMember = updatedMemberBuilder
+ .maybeUpdateRackId(Optional.ofNullable(rackId))
+
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
+ .setClientId(clientId)
+ .setClientHost(clientHost)
+ .build();
+
+ boolean bumpGroupEpoch = hasMemberSubscriptionChanged(
+ groupId,
+ member,
+ updatedMember,
+ records
+ );
+
+ int groupEpoch = group.groupEpoch();
+ Map<String, TopicMetadata> subscriptionMetadata =
group.subscriptionMetadata();
+ SubscriptionType subscriptionType = group.subscriptionType();
+
+ if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+ // The subscription metadata is updated in two cases:
+ // 1) The member has updated its subscriptions;
+ // 2) The refresh deadline has been reached.
+ Map<String, Integer> subscribedTopicNamesMap =
group.computeSubscribedTopicNames(member, updatedMember);
+ subscriptionMetadata = group.computeSubscriptionMetadata(
+ subscribedTopicNamesMap,
+ metadataImage.topics(),
+ metadataImage.cluster()
+ );
+
+ int numMembers = group.numMembers();
+ if (!group.hasMember(updatedMember.memberId())) {
+ numMembers++;
+ }
+
+ subscriptionType = ModernGroup.subscriptionType(
+ subscribedTopicNamesMap,
+ numMembers
+ );
+
+ if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+ log.info("[GroupId {}] Computed new subscription metadata:
{}.",
+ groupId, subscriptionMetadata);
+ bumpGroupEpoch = true;
+ replayRecords.add(newGroupSubscriptionMetadataRecord(groupId,
subscriptionMetadata));
+ }
+
+ if (bumpGroupEpoch) {
+ groupEpoch += 1;
+ records.add(newGroupEpochRecord(groupId, groupEpoch, SHARE));
+ log.info("[GroupId {}] Bumped group epoch to {}.", groupId,
groupEpoch);
+ }
+
+ group.setMetadataRefreshDeadline(currentTimeMs +
shareGroupMetadataRefreshIntervalMs, groupEpoch);
+ }
+
+ // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch.
+ final int targetAssignmentEpoch;
+ final Assignment targetAssignment;
+
+ if (groupEpoch > group.assignmentEpoch()) {
+ targetAssignment = updateTargetAssignment(
+ group,
+ groupEpoch,
+ updatedMember,
+ subscriptionMetadata,
+ subscriptionType,
+ replayRecords
+ );
+ targetAssignmentEpoch = groupEpoch;
Review Comment:
Yeah that's true. Though it's a simple iteration for share groups for now
but it might get complex in future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]