dajac commented on code in PR #16573:
URL: https://github.com/apache/kafka/pull/16573#discussion_r1696564483


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2388,6 +3138,82 @@ public 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
         }
     }
 
+    /**
+     * Handles a ShareGroupHeartbeat request.
+     *
+     * @param context The request context.
+     * @param request The actual ShareGroupHeartbeat request.
+     *
+     * @return A Result containing the ShareGroupHeartbeat response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<ShareGroupHeartbeatResponseData, 
CoordinatorRecord> shareGroupHeartbeat(
+        RequestContext context,
+        ShareGroupHeartbeatRequestData request
+    ) throws ApiException {
+        throwIfShareGroupHeartbeatRequestIsInvalid(request);
+
+        if (request.memberEpoch() == 
ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH) {
+            // -1 means that the member wants to leave the group.
+            return shareGroupLeave(
+                request.groupId(),
+                request.memberId(),
+                request.memberEpoch());
+        }
+        // Otherwise, it is a regular heartbeat.
+        return shareGroupHeartbeat(
+            request.groupId(),
+            request.memberId(),
+            request.memberEpoch(),
+            request.rackId(),
+            context.clientId(),
+            context.clientAddress.toString(),
+            request.subscribedTopicNames());
+    }
+
+    private void replay(CoordinatorRecord record) {

Review Comment:
   This method really annoys me because the concept of replay is linked to 
persisted records and this is not what it is here. It also duplicate some of 
the replay logic.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1128,6 +1401,26 @@ private void throwIfMemberEpochIsInvalid(
         }
     }
 
+    /**
+     * Validates the member epoch provided in the heartbeat request.
+     *
+     * @param member                The share group member.
+     * @param receivedMemberEpoch   The member epoch.
+     *
+     * @throws FencedMemberEpochException if the provided epoch is ahead or 
behind the epoch known
+     *                                    by this coordinator.
+     */
+    private void throwIfShareGroupMemberEpochIsInvalid(
+        ShareGroupMember member,
+        int receivedMemberEpoch
+    ) {
+        if (receivedMemberEpoch > member.memberEpoch()) {

Review Comment:
   My understanding is that the member epoch is never persisted. Is it correct? 
If it is, it means that after a fail over of the group coordinator, all the 
members will be fenced because the member epoch here will be back to zero. Is 
it the case?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1702,65 +2012,218 @@ private CoordinatorResult<Void, CoordinatorRecord> 
classicGroupJoinToConsumerGro
     }
 
     /**
-     * Gets or subscribes a new dynamic consumer group member.
+     * Handles a ShareGroupHeartbeat request.
      *
-     * @param group                 The consumer group.
-     * @param memberId              The member id.
-     * @param memberEpoch           The member epoch.
-     * @param ownedTopicPartitions  The owned partitions reported by the 
member.
-     * @param createIfNotExists     Whether the member should be created or 
not.
-     * @param useClassicProtocol    Whether the member uses the classic 
protocol.
+     * @param groupId               The group id from the request.
+     * @param memberId              The member id from the request.
+     * @param memberEpoch           The member epoch from the request.
+     * @param rackId                The rack id from the request or null.
+     * @param clientId              The client id.
+     * @param clientHost            The client host.
+     * @param subscribedTopicNames  The list of subscribed topic names from 
the request or null.
      *
-     * @return The existing consumer group member or a new one.
+     * @return A Result containing the ShareGroupHeartbeat response and
+     *         a list of records to update the state machine.
      */
-    private ConsumerGroupMember getOrMaybeSubscribeDynamicConsumerGroupMember(
-        ConsumerGroup group,
+    private CoordinatorResult<ShareGroupHeartbeatResponseData, 
CoordinatorRecord> shareGroupHeartbeat(
+        String groupId,
         String memberId,
         int memberEpoch,
-        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions,
-        boolean createIfNotExists,
-        boolean useClassicProtocol
-    ) {
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
-        if (!useClassicProtocol) {
-            throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
-        }
-        if (createIfNotExists) {
-            log.info("[GroupId {}] Member {} joins the consumer group using 
the {} protocol.",
-                group.groupId(), memberId, useClassicProtocol ? "classic" : 
"consumer");
-        }
-        return member;
-    }
+        String rackId,
+        String clientId,
+        String clientHost,
+        List<String> subscribedTopicNames
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        // The records which persists.
+        final List<CoordinatorRecord> records = new ArrayList<>();
+        // The records which are replayed immediately.
+        final List<CoordinatorRecord> replayRecords = new ArrayList<>();
 
-    /**
-     * Gets or subscribes a static consumer group member. This method also 
replaces the
-     * previous static member if allowed.
-     *
-     * @param group                 The consumer group.
-     * @param memberId              The member id.
-     * @param memberEpoch           The member epoch.
-     * @param instanceId            The instance id.
-     * @param ownedTopicPartitions  The owned partitions reported by the 
member.
-     * @param createIfNotExists     Whether the member should be created or 
not.
-     * @param useClassicProtocol    Whether the member uses the classic 
protocol.
-     * @param records               The list to accumulate records created to 
replace
-     *                              the previous static member.
-     *                              
-     * @return The existing consumer group member or a new one.
-     */
-    private ConsumerGroupMember getOrMaybeSubscribeStaticConsumerGroupMember(
-        ConsumerGroup group,
-        String memberId,
-        int memberEpoch,
-        String instanceId,
-        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions,
-        boolean createIfNotExists,
-        boolean useClassicProtocol,
-        List<CoordinatorRecord> records
-    ) {
-        ConsumerGroupMember existingStaticMemberOrNull = 
group.staticMember(instanceId);
+        // Get or create the share group.
+        boolean createIfNotExists = memberEpoch == 0;
+        final ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, 
createIfNotExists);
+        throwIfShareGroupIsFull(group, memberId);
 
-        if (createIfNotExists) {
+        // Get or create the member.
+        if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
+        ShareGroupMember member = getOrMaybeSubscribeShareGroupMember(
+            group,
+            memberId,
+            memberEpoch,
+            createIfNotExists
+        );
+
+        ShareGroupMember.Builder updatedMemberBuilder = new 
ShareGroupMember.Builder(member);
+        // 1. Create or update the member.
+        ShareGroupMember updatedMember = updatedMemberBuilder
+            .maybeUpdateRackId(Optional.ofNullable(rackId))
+            
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
+            .setClientId(clientId)
+            .setClientHost(clientHost)
+            .build();
+
+        boolean bumpGroupEpoch = hasMemberSubscriptionChanged(
+            groupId,
+            member,
+            updatedMember,
+            records
+        );
+
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
+        SubscriptionType subscriptionType = group.subscriptionType();
+
+        if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+            // The subscription metadata is updated in two cases:
+            // 1) The member has updated its subscriptions;
+            // 2) The refresh deadline has been reached.
+            Map<String, Integer> subscribedTopicNamesMap = 
group.computeSubscribedTopicNames(member, updatedMember);
+            subscriptionMetadata = group.computeSubscriptionMetadata(
+                subscribedTopicNamesMap,
+                metadataImage.topics(),
+                metadataImage.cluster()
+            );
+
+            int numMembers = group.numMembers();
+            if (!group.hasMember(updatedMember.memberId())) {
+                numMembers++;
+            }
+
+            subscriptionType = ModernGroup.subscriptionType(
+                subscribedTopicNamesMap,
+                numMembers
+            );
+
+            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                log.info("[GroupId {}] Computed new subscription metadata: 
{}.",
+                    groupId, subscriptionMetadata);
+                bumpGroupEpoch = true;
+                replayRecords.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+            }
+
+            if (bumpGroupEpoch) {
+                groupEpoch += 1;
+                records.add(newGroupEpochRecord(groupId, groupEpoch, SHARE));
+                log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+            }
+
+            group.setMetadataRefreshDeadline(currentTimeMs + 
shareGroupMetadataRefreshIntervalMs, groupEpoch);
+        }
+
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch.
+        final int targetAssignmentEpoch;
+        final Assignment targetAssignment;
+
+        if (groupEpoch > group.assignmentEpoch()) {
+            targetAssignment = updateTargetAssignment(
+                group,
+                groupEpoch,
+                updatedMember,
+                subscriptionMetadata,
+                subscriptionType,
+                replayRecords
+            );
+            targetAssignmentEpoch = groupEpoch;

Review Comment:
   My understanding is that we don't persist the target assignment. One thing 
to consider is that it will trigger a re-computation of all the target 
assignment on coordinator failovers. This is probably costly.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2621,26 +3497,140 @@ public void replay(
     public void replay(
         ConsumerGroupCurrentMemberAssignmentKey key,
         ConsumerGroupCurrentMemberAssignmentValue value
+    ) {
+        replay(key, value, CONSUMER);
+    }
+
+    /**
+     * Replays ConsumerGroupCurrentMemberAssignmentKey/Value to update the 
hard state of
+     * the consumer group. It updates the assignment of a member or deletes it.
+     *
+     * @param key   A ConsumerGroupCurrentMemberAssignmentKey key.
+     * @param value A ConsumerGroupCurrentMemberAssignmentValue record.
+     */
+    public void replay(
+        ConsumerGroupCurrentMemberAssignmentKey key,
+        ConsumerGroupCurrentMemberAssignmentValue value,
+        GroupType groupType

Review Comment:
   All theses group types passes to the replay methods are really confusing 
too. It gives the impressions that the state is actually persisted but it is 
not. I think that it will make the code hard to reason about in the future. We 
should avoid this. Those records are for consumer groups.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1702,65 +2012,218 @@ private CoordinatorResult<Void, CoordinatorRecord> 
classicGroupJoinToConsumerGro
     }
 
     /**
-     * Gets or subscribes a new dynamic consumer group member.
+     * Handles a ShareGroupHeartbeat request.
      *
-     * @param group                 The consumer group.
-     * @param memberId              The member id.
-     * @param memberEpoch           The member epoch.
-     * @param ownedTopicPartitions  The owned partitions reported by the 
member.
-     * @param createIfNotExists     Whether the member should be created or 
not.
-     * @param useClassicProtocol    Whether the member uses the classic 
protocol.
+     * @param groupId               The group id from the request.
+     * @param memberId              The member id from the request.
+     * @param memberEpoch           The member epoch from the request.
+     * @param rackId                The rack id from the request or null.
+     * @param clientId              The client id.
+     * @param clientHost            The client host.
+     * @param subscribedTopicNames  The list of subscribed topic names from 
the request or null.
      *
-     * @return The existing consumer group member or a new one.
+     * @return A Result containing the ShareGroupHeartbeat response and
+     *         a list of records to update the state machine.
      */
-    private ConsumerGroupMember getOrMaybeSubscribeDynamicConsumerGroupMember(
-        ConsumerGroup group,
+    private CoordinatorResult<ShareGroupHeartbeatResponseData, 
CoordinatorRecord> shareGroupHeartbeat(
+        String groupId,
         String memberId,
         int memberEpoch,
-        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions,
-        boolean createIfNotExists,
-        boolean useClassicProtocol
-    ) {
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
-        if (!useClassicProtocol) {
-            throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
-        }
-        if (createIfNotExists) {
-            log.info("[GroupId {}] Member {} joins the consumer group using 
the {} protocol.",
-                group.groupId(), memberId, useClassicProtocol ? "classic" : 
"consumer");
-        }
-        return member;
-    }
+        String rackId,
+        String clientId,
+        String clientHost,
+        List<String> subscribedTopicNames
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        // The records which persists.
+        final List<CoordinatorRecord> records = new ArrayList<>();
+        // The records which are replayed immediately.
+        final List<CoordinatorRecord> replayRecords = new ArrayList<>();
 
-    /**
-     * Gets or subscribes a static consumer group member. This method also 
replaces the
-     * previous static member if allowed.
-     *
-     * @param group                 The consumer group.
-     * @param memberId              The member id.
-     * @param memberEpoch           The member epoch.
-     * @param instanceId            The instance id.
-     * @param ownedTopicPartitions  The owned partitions reported by the 
member.
-     * @param createIfNotExists     Whether the member should be created or 
not.
-     * @param useClassicProtocol    Whether the member uses the classic 
protocol.
-     * @param records               The list to accumulate records created to 
replace
-     *                              the previous static member.
-     *                              
-     * @return The existing consumer group member or a new one.
-     */
-    private ConsumerGroupMember getOrMaybeSubscribeStaticConsumerGroupMember(
-        ConsumerGroup group,
-        String memberId,
-        int memberEpoch,
-        String instanceId,
-        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions,
-        boolean createIfNotExists,
-        boolean useClassicProtocol,
-        List<CoordinatorRecord> records
-    ) {
-        ConsumerGroupMember existingStaticMemberOrNull = 
group.staticMember(instanceId);
+        // Get or create the share group.
+        boolean createIfNotExists = memberEpoch == 0;
+        final ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, 
createIfNotExists);
+        throwIfShareGroupIsFull(group, memberId);
 
-        if (createIfNotExists) {
+        // Get or create the member.
+        if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
+        ShareGroupMember member = getOrMaybeSubscribeShareGroupMember(
+            group,
+            memberId,
+            memberEpoch,
+            createIfNotExists
+        );
+
+        ShareGroupMember.Builder updatedMemberBuilder = new 
ShareGroupMember.Builder(member);
+        // 1. Create or update the member.
+        ShareGroupMember updatedMember = updatedMemberBuilder
+            .maybeUpdateRackId(Optional.ofNullable(rackId))
+            
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
+            .setClientId(clientId)
+            .setClientHost(clientHost)
+            .build();
+
+        boolean bumpGroupEpoch = hasMemberSubscriptionChanged(
+            groupId,
+            member,
+            updatedMember,
+            records
+        );
+
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
+        SubscriptionType subscriptionType = group.subscriptionType();
+
+        if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+            // The subscription metadata is updated in two cases:
+            // 1) The member has updated its subscriptions;
+            // 2) The refresh deadline has been reached.
+            Map<String, Integer> subscribedTopicNamesMap = 
group.computeSubscribedTopicNames(member, updatedMember);
+            subscriptionMetadata = group.computeSubscriptionMetadata(
+                subscribedTopicNamesMap,
+                metadataImage.topics(),
+                metadataImage.cluster()
+            );
+
+            int numMembers = group.numMembers();
+            if (!group.hasMember(updatedMember.memberId())) {
+                numMembers++;
+            }
+
+            subscriptionType = ModernGroup.subscriptionType(
+                subscribedTopicNamesMap,
+                numMembers
+            );
+
+            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                log.info("[GroupId {}] Computed new subscription metadata: 
{}.",
+                    groupId, subscriptionMetadata);
+                bumpGroupEpoch = true;
+                replayRecords.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+            }
+
+            if (bumpGroupEpoch) {
+                groupEpoch += 1;
+                records.add(newGroupEpochRecord(groupId, groupEpoch, SHARE));
+                log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+            }
+
+            group.setMetadataRefreshDeadline(currentTimeMs + 
shareGroupMetadataRefreshIntervalMs, groupEpoch);
+        }
+
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch.
+        final int targetAssignmentEpoch;
+        final Assignment targetAssignment;
+
+        if (groupEpoch > group.assignmentEpoch()) {
+            targetAssignment = updateTargetAssignment(
+                group,
+                groupEpoch,
+                updatedMember,
+                subscriptionMetadata,
+                subscriptionType,
+                replayRecords
+            );
+            targetAssignmentEpoch = groupEpoch;
+        } else {
+            targetAssignmentEpoch = group.assignmentEpoch();
+            targetAssignment = 
group.targetAssignment(updatedMember.memberId());
+        }
+
+        // 3. Reconcile the member's assignment with the target assignment if 
the member is not
+        // fully reconciled yet.
+        updatedMember = maybeReconcile(
+            groupId,
+            updatedMember,
+            targetAssignmentEpoch,
+            targetAssignment,
+            replayRecords
+        );
+
+        scheduleShareGroupSessionTimeout(groupId, memberId);
+
+        // Prepare the response.
+        ShareGroupHeartbeatResponseData response = new 
ShareGroupHeartbeatResponseData()
+            .setMemberId(updatedMember.memberId())
+            .setMemberEpoch(updatedMember.memberEpoch())
+            .setHeartbeatIntervalMs(shareGroupHeartbeatIntervalMs);
+
+        // The assignment is only provided in the following cases:
+        // 1. The member just joined or rejoined to group (epoch equals to 
zero);
+        // 2. The member's assignment has been updated.
+        if (memberEpoch == 0 || hasAssignedPartitionsChanged(member, 
updatedMember)) {
+            
response.setAssignment(createShareGroupResponseAssignment(updatedMember));
+        }
+
+        /*
+         For a consumer group, below updates would occur as a result of 
applying (replaying) a member
+         or group persisted record. Because a share group doesn't persist 
additional records, hence we
+         need to replay the generated records here.
+        */
+        replayRecords.forEach(this::replay);
+        return new CoordinatorResult<>(records, response);

Review Comment:
   I believe that this does not fully work as you expect. For instance, if you 
have records in `replayRecords` that you replay but without any records in 
`records`, the changes may be rolled back if the next write operation executed 
by the coordinator fails. This happens because the coordinator runtime does not 
know about those changes in the timeline data structures and it will revert to 
the last written offset which is computed based on the previous write 
operation. This could lead to tricky state inconsistencies. I am not sure if it 
matters in this particular case but it is for sure confusing.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1967,6 +2523,60 @@ private Assignment updateTargetAssignment(
         }
     }
 
+    /**
+     * Updates the target assignment according to the updated member and 
subscription metadata.
+     *
+     * @param group                 The ShareGroup.
+     * @param groupEpoch            The group epoch.
+     * @param updatedMember         The updated member.
+     * @param subscriptionMetadata  The subscription metadata.
+     * @param subscriptionType      The group subscription type.
+     * @param records               The list to accumulate any new records.
+     * @return The new target assignment.
+     */
+    private Assignment updateTargetAssignment(

Review Comment:
   This one looks very similar to the existing one. Would it be possible to 
reuse it? We could perhaps parameterize the type of the member. I am not sure 
whether it works or not.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2080,6 +2715,55 @@ private <T> CoordinatorResult<T, CoordinatorRecord> 
consumerGroupFenceMember(
         }
     }
 
+    /**
+     * Removes a member from a share group.
+     *
+     * @param group       The group.
+     * @param member      The member.
+     *
+     * @return A list of records to be applied to the state.
+     */
+    private <T> CoordinatorResult<T, CoordinatorRecord> shareGroupFenceMember(
+        ShareGroup group,
+        ShareGroupMember member,
+        T response
+    ) {
+        List<CoordinatorRecord> records = new ArrayList<>();
+        List<CoordinatorRecord> replayRecords = new ArrayList<>();
+
+        replayRecords.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
member.memberId()));
+        replayRecords.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
member.memberId()));

Review Comment:
   This does not seem necessary as we could rely on the persisted record to 
delete the entire member. Could we?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to