dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580929744


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,6 +1506,243 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+        throwIfConsumerGroupIsFull(group, memberId);
+        throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            // A dynamic member (re-)joins.
+            throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, 
context);
+            newMemberCreated = !group.hasMember(memberId);
+            member = group.getOrMaybeCreateMember(memberId, true);
+            log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+            updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+        } else {
+            member = group.staticMember(instanceId);
+            // A new static member joins or the existing static member rejoins.
+            if (isUnknownMember) {
+                newMemberCreated = true;
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, true);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Replace the current static member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    removeMember(records, groupId, member.memberId());
+                    log.info("[GroupId {}] Static member with unknown member 
id and instance id {} re-joins the consumer group. " +
+                        "Created a new member {} to replace the existing 
member {}.", groupId, instanceId, memberId, member.memberId());
+                }
+            } else {
+                // Rejoining static member. Fence the static group with 
unmatched member id.
+                throwIfStaticMemberIsUnknown(member, instanceId);
+                throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+                log.info("[GroupId {}] Static member {} with instance id {} 
re-joins the consumer group.", groupId, memberId, instanceId);
+            }
+        }
+
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        final List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions =
+            validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+        // 1. Create or update the member. If the member is new or has 
changed, a ConsumerGroupMemberMetadataValue
+        // record is written to the __consumer_offsets partition to persist 
the change. If the subscriptions have
+        // changed, the subscription metadata is updated and persisted by 
writing a ConsumerGroupPartitionMetadataValue
+        // record to the __consumer_offsets partition. Finally, the group 
epoch is bumped if the subscriptions have
+        // changed, and persisted by writing a ConsumerGroupMetadataValue 
record to the partition.
+        ConsumerGroupMember updatedMember = updatedMemberBuilder
+            .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+            .maybeUpdateRackId(subscription.rackId())
+            
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+            .maybeUpdateServerAssignorName(Optional.empty())
+            
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+            .setClientId(context.clientId())
+            .setClientHost(context.clientAddress.toString())
+            .setSupportedClassicProtocols(protocols)
+            .build();
+
+        boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, 
member, updatedMember, records);
+        if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+            // The subscription metadata is updated in two cases:
+            // 1) The member has updated its subscriptions;
+            // 2) The refresh deadline has been reached.
+            subscriptionMetadata = group.computeSubscriptionMetadata(
+                member,
+                updatedMember,
+                metadataImage.topics(),
+                metadataImage.cluster()
+            );
+
+            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                log.info("[GroupId {}] Computed new subscription metadata: 
{}.",
+                    groupId, subscriptionMetadata);
+                bumpGroupEpoch = true;
+                records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+            }
+
+            if (bumpGroupEpoch) {
+                groupEpoch += 1;
+                records.add(newGroupEpochRecord(groupId, groupEpoch));
+                log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+                metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
+            }
+
+            group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
+        }
+
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+        // replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
+        int targetAssignmentEpoch = group.assignmentEpoch();
+        Assignment targetAssignment = group.targetAssignment(memberId);
+        if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
+            String preferredServerAssignor = 
group.computePreferredServerAssignor(
+                member,
+                updatedMember
+            ).orElse(defaultAssignor.name());
+            try {
+                TargetAssignmentBuilder assignmentResultBuilder =
+                    new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+                        .withMembers(group.members())
+                        .withStaticMembers(group.staticMembers())
+                        .withSubscriptionMetadata(subscriptionMetadata)
+                        .withTargetAssignment(group.targetAssignment())
+                        .addOrUpdateMember(memberId, updatedMember);
+                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+                // A new static member is replacing an older one with the same 
subscriptions.
+                // We just need to remove the older member and add the newer 
one. The new member should
+                // reuse the target assignment of the older member.
+                if (staticMemberReplaced) {
+                    assignmentResult = assignmentResultBuilder
+                        .removeMember(member.memberId())
+                        .build();
+                } else {
+                    assignmentResult = assignmentResultBuilder
+                        .build();
+                }
+
+                log.info("[GroupId {}] Computed a new target assignment for 
epoch {} with '{}' assignor: {}.",
+                    groupId, groupEpoch, preferredServerAssignor, 
assignmentResult.targetAssignment());
+
+                records.addAll(assignmentResult.records());
+                targetAssignment = 
assignmentResult.targetAssignment().get(memberId);
+                targetAssignmentEpoch = groupEpoch;
+            } catch (PartitionAssignorException ex) {
+                String msg = String.format("Failed to compute a new target 
assignment for epoch %d: %s",
+                    groupEpoch, ex.getMessage());
+                log.error("[GroupId {}] {}.", groupId, msg);
+                throw new UnknownServerException(msg, ex);
+            }
+        }
+
+        // 3. Reconcile the member's assignment with the target assignment if 
the member is not
+        // fully reconciled yet.
+
+        /**
+         * TODO:
+         *      joinGroup - sync timeout
+         *      syncGroup - (join timeout)
+         *      heartbeat - (join timeout)
+         *      => scheduleConsumerGroupRebalanceTimeout is not necessary
+         */
+        updatedMember = maybeReconcile(
+            groupId,
+            updatedMember,
+            group::currentPartitionEpoch,
+            targetAssignmentEpoch,
+            targetAssignment,
+            ownedTopicPartitions,
+            records
+        );
+
+        if (newMemberCreated) {
+            scheduleConsumerGroupSessionTimeout(groupId, memberId);
+        }
+        scheduleConsumerGroupSyncTimeout(groupId, memberId, 
request.rebalanceTimeoutMs());
+
+        responseFuture.complete(new JoinGroupResponseData()
+            .setMemberId(updatedMember.memberId())
+            .setGenerationId(updatedMember.memberEpoch())
+            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .setProtocolName(protocols.iterator().next().name())
+        );
+
+        return new CoordinatorResult<>(records);
+    }
+
+    /**
+     * Creates the member subscription record if the updatedMember is 
different from
+     * the old member. Returns true if the 
subscribedTopicNames/subscribedTopicRegex
+     * has changed.
+     *
+     * @param groupId       The group id.
+     * @param memberId      The member id.
+     * @param member        The old member.
+     * @param updatedMember The updated member.
+     * @param records       The list to accumulate any new records.
+     * @return A boolean indicating whether the updatedMember has a different
+     *         subscribedTopicNames/subscribedTopicRegex from the old member.
+     */
+    private boolean updateMemberSubscription(
+        String groupId,
+        String memberId,

Review Comment:
   nit: Could we get the member id from the updated member?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,6 +1506,243 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+        throwIfConsumerGroupIsFull(group, memberId);
+        throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            // A dynamic member (re-)joins.
+            throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, 
context);
+            newMemberCreated = !group.hasMember(memberId);
+            member = group.getOrMaybeCreateMember(memberId, true);
+            log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+            updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+        } else {
+            member = group.staticMember(instanceId);
+            // A new static member joins or the existing static member rejoins.
+            if (isUnknownMember) {
+                newMemberCreated = true;
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, true);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Replace the current static member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    removeMember(records, groupId, member.memberId());
+                    log.info("[GroupId {}] Static member with unknown member 
id and instance id {} re-joins the consumer group. " +
+                        "Created a new member {} to replace the existing 
member {}.", groupId, instanceId, memberId, member.memberId());
+                }
+            } else {
+                // Rejoining static member. Fence the static group with 
unmatched member id.
+                throwIfStaticMemberIsUnknown(member, instanceId);
+                throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+                log.info("[GroupId {}] Static member {} with instance id {} 
re-joins the consumer group.", groupId, memberId, instanceId);
+            }
+        }
+
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        final List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions =
+            validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+        // 1. Create or update the member. If the member is new or has 
changed, a ConsumerGroupMemberMetadataValue
+        // record is written to the __consumer_offsets partition to persist 
the change. If the subscriptions have
+        // changed, the subscription metadata is updated and persisted by 
writing a ConsumerGroupPartitionMetadataValue
+        // record to the __consumer_offsets partition. Finally, the group 
epoch is bumped if the subscriptions have
+        // changed, and persisted by writing a ConsumerGroupMetadataValue 
record to the partition.
+        ConsumerGroupMember updatedMember = updatedMemberBuilder
+            .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+            .maybeUpdateRackId(subscription.rackId())
+            
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+            .maybeUpdateServerAssignorName(Optional.empty())
+            
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+            .setClientId(context.clientId())
+            .setClientHost(context.clientAddress.toString())
+            .setSupportedClassicProtocols(protocols)
+            .build();
+
+        boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, 
member, updatedMember, records);
+        if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+            // The subscription metadata is updated in two cases:
+            // 1) The member has updated its subscriptions;
+            // 2) The refresh deadline has been reached.
+            subscriptionMetadata = group.computeSubscriptionMetadata(
+                member,
+                updatedMember,
+                metadataImage.topics(),
+                metadataImage.cluster()
+            );
+
+            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                log.info("[GroupId {}] Computed new subscription metadata: 
{}.",
+                    groupId, subscriptionMetadata);
+                bumpGroupEpoch = true;
+                records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+            }
+
+            if (bumpGroupEpoch) {
+                groupEpoch += 1;
+                records.add(newGroupEpochRecord(groupId, groupEpoch));
+                log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+                metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
+            }
+
+            group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
+        }
+
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+        // replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
+        int targetAssignmentEpoch = group.assignmentEpoch();
+        Assignment targetAssignment = group.targetAssignment(memberId);
+        if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
+            String preferredServerAssignor = 
group.computePreferredServerAssignor(
+                member,
+                updatedMember
+            ).orElse(defaultAssignor.name());
+            try {
+                TargetAssignmentBuilder assignmentResultBuilder =
+                    new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+                        .withMembers(group.members())
+                        .withStaticMembers(group.staticMembers())
+                        .withSubscriptionMetadata(subscriptionMetadata)
+                        .withTargetAssignment(group.targetAssignment())
+                        .addOrUpdateMember(memberId, updatedMember);
+                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+                // A new static member is replacing an older one with the same 
subscriptions.
+                // We just need to remove the older member and add the newer 
one. The new member should
+                // reuse the target assignment of the older member.
+                if (staticMemberReplaced) {
+                    assignmentResult = assignmentResultBuilder
+                        .removeMember(member.memberId())
+                        .build();
+                } else {
+                    assignmentResult = assignmentResultBuilder
+                        .build();
+                }
+
+                log.info("[GroupId {}] Computed a new target assignment for 
epoch {} with '{}' assignor: {}.",
+                    groupId, groupEpoch, preferredServerAssignor, 
assignmentResult.targetAssignment());
+
+                records.addAll(assignmentResult.records());
+                targetAssignment = 
assignmentResult.targetAssignment().get(memberId);
+                targetAssignmentEpoch = groupEpoch;
+            } catch (PartitionAssignorException ex) {
+                String msg = String.format("Failed to compute a new target 
assignment for epoch %d: %s",
+                    groupEpoch, ex.getMessage());
+                log.error("[GroupId {}] {}.", groupId, msg);
+                throw new UnknownServerException(msg, ex);
+            }
+        }
+
+        // 3. Reconcile the member's assignment with the target assignment if 
the member is not
+        // fully reconciled yet.
+
+        /**
+         * TODO:
+         *      joinGroup - sync timeout
+         *      syncGroup - (join timeout)
+         *      heartbeat - (join timeout)
+         *      => scheduleConsumerGroupRebalanceTimeout is not necessary
+         */
+        updatedMember = maybeReconcile(
+            groupId,
+            updatedMember,
+            group::currentPartitionEpoch,
+            targetAssignmentEpoch,
+            targetAssignment,
+            ownedTopicPartitions,
+            records
+        );
+
+        if (newMemberCreated) {
+            scheduleConsumerGroupSessionTimeout(groupId, memberId);
+        }

Review Comment:
   To simplify, I wonder if we should just consider the join/sync group as a 
heartbeat signal too. What do you think?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,6 +1506,243 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+        throwIfConsumerGroupIsFull(group, memberId);
+        throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            // A dynamic member (re-)joins.
+            throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, 
context);
+            newMemberCreated = !group.hasMember(memberId);
+            member = group.getOrMaybeCreateMember(memberId, true);
+            log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+            updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+        } else {
+            member = group.staticMember(instanceId);
+            // A new static member joins or the existing static member rejoins.
+            if (isUnknownMember) {
+                newMemberCreated = true;
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, true);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Replace the current static member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    removeMember(records, groupId, member.memberId());
+                    log.info("[GroupId {}] Static member with unknown member 
id and instance id {} re-joins the consumer group. " +
+                        "Created a new member {} to replace the existing 
member {}.", groupId, instanceId, memberId, member.memberId());
+                }
+            } else {
+                // Rejoining static member. Fence the static group with 
unmatched member id.
+                throwIfStaticMemberIsUnknown(member, instanceId);
+                throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+                log.info("[GroupId {}] Static member {} with instance id {} 
re-joins the consumer group.", groupId, memberId, instanceId);
+            }
+        }
+
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        final List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions =
+            validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+        // 1. Create or update the member. If the member is new or has 
changed, a ConsumerGroupMemberMetadataValue
+        // record is written to the __consumer_offsets partition to persist 
the change. If the subscriptions have
+        // changed, the subscription metadata is updated and persisted by 
writing a ConsumerGroupPartitionMetadataValue
+        // record to the __consumer_offsets partition. Finally, the group 
epoch is bumped if the subscriptions have
+        // changed, and persisted by writing a ConsumerGroupMetadataValue 
record to the partition.
+        ConsumerGroupMember updatedMember = updatedMemberBuilder
+            .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+            .maybeUpdateRackId(subscription.rackId())
+            
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+            .maybeUpdateServerAssignorName(Optional.empty())
+            
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+            .setClientId(context.clientId())
+            .setClientHost(context.clientAddress.toString())
+            .setSupportedClassicProtocols(protocols)
+            .build();
+
+        boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, 
member, updatedMember, records);
+        if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+            // The subscription metadata is updated in two cases:
+            // 1) The member has updated its subscriptions;
+            // 2) The refresh deadline has been reached.
+            subscriptionMetadata = group.computeSubscriptionMetadata(
+                member,
+                updatedMember,
+                metadataImage.topics(),
+                metadataImage.cluster()
+            );
+
+            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                log.info("[GroupId {}] Computed new subscription metadata: 
{}.",
+                    groupId, subscriptionMetadata);
+                bumpGroupEpoch = true;
+                records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+            }
+
+            if (bumpGroupEpoch) {
+                groupEpoch += 1;
+                records.add(newGroupEpochRecord(groupId, groupEpoch));
+                log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+                metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
+            }
+
+            group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
+        }
+
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+        // replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
+        int targetAssignmentEpoch = group.assignmentEpoch();
+        Assignment targetAssignment = group.targetAssignment(memberId);
+        if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
+            String preferredServerAssignor = 
group.computePreferredServerAssignor(
+                member,
+                updatedMember
+            ).orElse(defaultAssignor.name());
+            try {
+                TargetAssignmentBuilder assignmentResultBuilder =
+                    new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+                        .withMembers(group.members())
+                        .withStaticMembers(group.staticMembers())
+                        .withSubscriptionMetadata(subscriptionMetadata)
+                        .withTargetAssignment(group.targetAssignment())
+                        .addOrUpdateMember(memberId, updatedMember);
+                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+                // A new static member is replacing an older one with the same 
subscriptions.
+                // We just need to remove the older member and add the newer 
one. The new member should
+                // reuse the target assignment of the older member.
+                if (staticMemberReplaced) {
+                    assignmentResult = assignmentResultBuilder
+                        .removeMember(member.memberId())
+                        .build();
+                } else {
+                    assignmentResult = assignmentResultBuilder
+                        .build();
+                }
+
+                log.info("[GroupId {}] Computed a new target assignment for 
epoch {} with '{}' assignor: {}.",
+                    groupId, groupEpoch, preferredServerAssignor, 
assignmentResult.targetAssignment());
+
+                records.addAll(assignmentResult.records());
+                targetAssignment = 
assignmentResult.targetAssignment().get(memberId);
+                targetAssignmentEpoch = groupEpoch;
+            } catch (PartitionAssignorException ex) {
+                String msg = String.format("Failed to compute a new target 
assignment for epoch %d: %s",
+                    groupEpoch, ex.getMessage());
+                log.error("[GroupId {}] {}.", groupId, msg);
+                throw new UnknownServerException(msg, ex);
+            }
+        }
+
+        // 3. Reconcile the member's assignment with the target assignment if 
the member is not
+        // fully reconciled yet.
+
+        /**
+         * TODO:
+         *      joinGroup - sync timeout
+         *      syncGroup - (join timeout)
+         *      heartbeat - (join timeout)
+         *      => scheduleConsumerGroupRebalanceTimeout is not necessary
+         */
+        updatedMember = maybeReconcile(
+            groupId,
+            updatedMember,
+            group::currentPartitionEpoch,
+            targetAssignmentEpoch,
+            targetAssignment,
+            ownedTopicPartitions,
+            records
+        );
+
+        if (newMemberCreated) {
+            scheduleConsumerGroupSessionTimeout(groupId, memberId);
+        }
+        scheduleConsumerGroupSyncTimeout(groupId, memberId, 
request.rebalanceTimeoutMs());

Review Comment:
   nit: Should we put a small comment to explain this one?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1169,6 +1175,107 @@ private void 
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
         }
     }
 
+    /**
+     * Validates if the received classic member protocols are supported by the 
group.
+     *
+     * @param group         The ConsumerGroup.
+     * @param memberId      The joining member id.
+     * @param protocolType  The joining member protocol type.
+     * @param protocols     The joining member protocol collection.
+     */
+    private void throwIfClassicProtocolIsNotSupported(
+        ConsumerGroup group,
+        String memberId,
+        String protocolType,
+        JoinGroupRequestProtocolCollection protocols
+    ) {
+        if (!group.supportsClassicProtocols(protocolType, 
ClassicGroupMember.plainProtocolSet(protocols))) {

Review Comment:
   I am not sure about the validation here. My concern is that the first 
classic member could join without any protocols based on the current 
validation. It seems that we only require it to be non empty when the group is 
not empty too. Should we also validate that `protocolType` is always 
`ConsumerProtocol.PROTOCOL_TYPE` and `protocols` is not empty?
   
   The logic in `supportsClassicProtocols` may not be 100% correct too.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1261,8 +1368,9 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
                     staticMemberReplaced = true;
                     updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
                         .setAssignedPartitions(member.assignedPartitions());
-                    removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
-                    log.info("[GroupId {}] Static member {} with instance id 
{} re-joins the consumer group.", groupId, memberId, instanceId);
+                    removeMember(records, groupId, member.memberId());

Review Comment:
   nit: Could we put a comment here or in `removeMember` explaining why we 
don't cancel the timers?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,6 +1506,243 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+        throwIfConsumerGroupIsFull(group, memberId);
+        throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            // A dynamic member (re-)joins.
+            throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, 
context);
+            newMemberCreated = !group.hasMember(memberId);
+            member = group.getOrMaybeCreateMember(memberId, true);
+            log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+            updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+        } else {
+            member = group.staticMember(instanceId);
+            // A new static member joins or the existing static member rejoins.
+            if (isUnknownMember) {
+                newMemberCreated = true;
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, true);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Replace the current static member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    removeMember(records, groupId, member.memberId());
+                    log.info("[GroupId {}] Static member with unknown member 
id and instance id {} re-joins the consumer group. " +
+                        "Created a new member {} to replace the existing 
member {}.", groupId, instanceId, memberId, member.memberId());
+                }
+            } else {
+                // Rejoining static member. Fence the static group with 
unmatched member id.
+                throwIfStaticMemberIsUnknown(member, instanceId);
+                throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+                log.info("[GroupId {}] Static member {} with instance id {} 
re-joins the consumer group.", groupId, memberId, instanceId);
+            }
+        }
+
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        final List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions =
+            validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+        // 1. Create or update the member. If the member is new or has 
changed, a ConsumerGroupMemberMetadataValue
+        // record is written to the __consumer_offsets partition to persist 
the change. If the subscriptions have
+        // changed, the subscription metadata is updated and persisted by 
writing a ConsumerGroupPartitionMetadataValue
+        // record to the __consumer_offsets partition. Finally, the group 
epoch is bumped if the subscriptions have
+        // changed, and persisted by writing a ConsumerGroupMetadataValue 
record to the partition.
+        ConsumerGroupMember updatedMember = updatedMemberBuilder
+            .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+            .maybeUpdateRackId(subscription.rackId())
+            
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+            .maybeUpdateServerAssignorName(Optional.empty())
+            
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+            .setClientId(context.clientId())
+            .setClientHost(context.clientAddress.toString())
+            .setSupportedClassicProtocols(protocols)
+            .build();
+
+        boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, 
member, updatedMember, records);
+        if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+            // The subscription metadata is updated in two cases:
+            // 1) The member has updated its subscriptions;
+            // 2) The refresh deadline has been reached.
+            subscriptionMetadata = group.computeSubscriptionMetadata(
+                member,
+                updatedMember,
+                metadataImage.topics(),
+                metadataImage.cluster()
+            );
+
+            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                log.info("[GroupId {}] Computed new subscription metadata: 
{}.",
+                    groupId, subscriptionMetadata);
+                bumpGroupEpoch = true;
+                records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+            }
+
+            if (bumpGroupEpoch) {
+                groupEpoch += 1;
+                records.add(newGroupEpochRecord(groupId, groupEpoch));
+                log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+                metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
+            }
+
+            group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
+        }
+
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+        // replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
+        int targetAssignmentEpoch = group.assignmentEpoch();
+        Assignment targetAssignment = group.targetAssignment(memberId);
+        if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
+            String preferredServerAssignor = 
group.computePreferredServerAssignor(
+                member,
+                updatedMember
+            ).orElse(defaultAssignor.name());
+            try {
+                TargetAssignmentBuilder assignmentResultBuilder =
+                    new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+                        .withMembers(group.members())
+                        .withStaticMembers(group.staticMembers())
+                        .withSubscriptionMetadata(subscriptionMetadata)
+                        .withTargetAssignment(group.targetAssignment())
+                        .addOrUpdateMember(memberId, updatedMember);
+                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+                // A new static member is replacing an older one with the same 
subscriptions.
+                // We just need to remove the older member and add the newer 
one. The new member should
+                // reuse the target assignment of the older member.
+                if (staticMemberReplaced) {
+                    assignmentResult = assignmentResultBuilder
+                        .removeMember(member.memberId())
+                        .build();
+                } else {
+                    assignmentResult = assignmentResultBuilder
+                        .build();
+                }
+
+                log.info("[GroupId {}] Computed a new target assignment for 
epoch {} with '{}' assignor: {}.",
+                    groupId, groupEpoch, preferredServerAssignor, 
assignmentResult.targetAssignment());
+
+                records.addAll(assignmentResult.records());
+                targetAssignment = 
assignmentResult.targetAssignment().get(memberId);
+                targetAssignmentEpoch = groupEpoch;
+            } catch (PartitionAssignorException ex) {
+                String msg = String.format("Failed to compute a new target 
assignment for epoch %d: %s",
+                    groupEpoch, ex.getMessage());
+                log.error("[GroupId {}] {}.", groupId, msg);
+                throw new UnknownServerException(msg, ex);
+            }
+        }
+
+        // 3. Reconcile the member's assignment with the target assignment if 
the member is not
+        // fully reconciled yet.
+
+        /**
+         * TODO:
+         *      joinGroup - sync timeout
+         *      syncGroup - (join timeout)
+         *      heartbeat - (join timeout)
+         *      => scheduleConsumerGroupRebalanceTimeout is not necessary
+         */

Review Comment:
   My understanding is that we need two timers:
   1) One when we trigger the rebalance on the heartbeat to ensure that we 
receive a join group;
   2) One when we receive the join group to ensure that we receive the sync 
group.
   Is my understanding correct?
   
   btw, we should remove the TODO from the code before we merge the PR. We 
usually don't keep TODOs.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,6 +1506,243 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();

Review Comment:
   nit: I would put this non final one after the final ones.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2227,81 +2611,95 @@ public CoordinatorResult<Void, Record> classicGroupJoin(
         RequestContext context,
         JoinGroupRequestData request,
         CompletableFuture<JoinGroupResponseData> responseFuture
+    ) {
+        throwIfClassicGroupSessionTimeoutInvalid(request.sessionTimeoutMs());
+
+        Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+        if (group != null && group.type() == CONSUMER && !group.isEmpty()) {

Review Comment:
   The `!group.isEmpty()` condition is a bit subtile to grasp at first. My 
understanding is that we want empty consumer groups to be converted to classic 
groups. Is my understanding correct? If it is, it may be good to add a small 
comment to the `if` branch to explain it.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,6 +1506,243 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+        throwIfConsumerGroupIsFull(group, memberId);
+        throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            // A dynamic member (re-)joins.
+            throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, 
context);
+            newMemberCreated = !group.hasMember(memberId);
+            member = group.getOrMaybeCreateMember(memberId, true);
+            log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+            updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+        } else {
+            member = group.staticMember(instanceId);
+            // A new static member joins or the existing static member rejoins.
+            if (isUnknownMember) {
+                newMemberCreated = true;
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, true);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Replace the current static member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    removeMember(records, groupId, member.memberId());
+                    log.info("[GroupId {}] Static member with unknown member 
id and instance id {} re-joins the consumer group. " +
+                        "Created a new member {} to replace the existing 
member {}.", groupId, instanceId, memberId, member.memberId());
+                }
+            } else {
+                // Rejoining static member. Fence the static group with 
unmatched member id.
+                throwIfStaticMemberIsUnknown(member, instanceId);
+                throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+                log.info("[GroupId {}] Static member {} with instance id {} 
re-joins the consumer group.", groupId, memberId, instanceId);
+            }
+        }
+
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        final List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions =
+            validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+        // 1. Create or update the member. If the member is new or has 
changed, a ConsumerGroupMemberMetadataValue
+        // record is written to the __consumer_offsets partition to persist 
the change. If the subscriptions have
+        // changed, the subscription metadata is updated and persisted by 
writing a ConsumerGroupPartitionMetadataValue
+        // record to the __consumer_offsets partition. Finally, the group 
epoch is bumped if the subscriptions have
+        // changed, and persisted by writing a ConsumerGroupMetadataValue 
record to the partition.
+        ConsumerGroupMember updatedMember = updatedMemberBuilder
+            .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+            .maybeUpdateRackId(subscription.rackId())
+            
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+            .maybeUpdateServerAssignorName(Optional.empty())
+            
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+            .setClientId(context.clientId())
+            .setClientHost(context.clientAddress.toString())
+            .setSupportedClassicProtocols(protocols)
+            .build();
+
+        boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, 
member, updatedMember, records);
+        if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+            // The subscription metadata is updated in two cases:
+            // 1) The member has updated its subscriptions;
+            // 2) The refresh deadline has been reached.
+            subscriptionMetadata = group.computeSubscriptionMetadata(
+                member,
+                updatedMember,
+                metadataImage.topics(),
+                metadataImage.cluster()
+            );
+
+            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                log.info("[GroupId {}] Computed new subscription metadata: 
{}.",
+                    groupId, subscriptionMetadata);
+                bumpGroupEpoch = true;
+                records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+            }
+
+            if (bumpGroupEpoch) {
+                groupEpoch += 1;
+                records.add(newGroupEpochRecord(groupId, groupEpoch));
+                log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+                metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
+            }
+
+            group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
+        }
+
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+        // replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
+        int targetAssignmentEpoch = group.assignmentEpoch();

Review Comment:
   I wonder if we could to the following in order to also reuse this code:
   
   ```
   int targetAssignmentEpoch = group.assignmentEpoch();
   Assignment targetAssignment = group.targetAssignment(memberId);
   if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
       targetAssignment = updateTargetAssignment(....);
       targetAssignmentEpoch = groupEpoch;
   }
   ```
   
   It seems that it should work.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10921,6 +10855,544 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
         assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
     }
 
+    @Test
+    public void testConsumerGroupJoinThrowsExceptionIfGroupOverMaxSize() {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .build()))
+            .withConsumerGroupMaxSize(1)
+            .build();
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+        assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+    }
+
+    @Test
+    public void testConsumerGroupJoinInvalidSessionTimeout() throws Exception {
+        int minSessionTimeout = 50;
+        int maxSessionTimeout = 100;
+        String groupId = "group-id";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withClassicGroupMinSessionTimeoutMs(minSessionTimeout)
+            .withClassicGroupMaxSessionTimeoutMs(maxSessionTimeout)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10))
+            .build();
+
+        JoinGroupRequestData requestWithSmallSessionTimeout = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withSessionTimeoutMs(minSessionTimeout - 1)
+            .build();
+        assertThrows(InvalidSessionTimeoutException.class, () -> 
context.sendClassicGroupJoin(requestWithSmallSessionTimeout));
+
+        JoinGroupRequestData requestWithLargeSessionTimeout = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withSessionTimeoutMs(maxSessionTimeout + 1)
+            .build();
+        assertThrows(InvalidSessionTimeoutException.class, () -> 
context.sendClassicGroupJoin(requestWithLargeSessionTimeout));
+    }
+
+    @Test
+    public void testConsumerGroupJoinThrowsExceptionIfProtocolIsNotSupported() 
{
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+                    .build()))
+            .build();
+
+        JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+        assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+        JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withProtocolType("connect")
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+        assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+    }
+
+    @Test
+    public void testConsumerGroupJoinWithNewDynamicMember() throws Exception {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 2)
+                .addTopic(barTopicId, barTopicName, 1)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
+                    {
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+                    }
+                })
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withProtocols(GroupMetadataManagerTestContext.toRangeProtocol(
+                Arrays.asList(fooTopicName, barTopicName),
+                Collections.emptyList()))
+            .build();
+        assertThrows(MemberIdRequiredException.class, () -> 
context.sendClassicGroupJoin(request, true));
+
+        // Simulate getting the new member id from the error response.
+        String newMemberId = Uuid.randomUuid().toString();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0)
+                    )));
+                    put(newMemberId, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(barTopicId, 0)
+                    )));
+                }
+            }
+        ));
+
+        JoinGroupRequestData secondRequest = new JoinGroupRequestData()
+            .setGroupId(request.groupId())
+            .setMemberId(newMemberId)
+            .setProtocolType(request.protocolType())
+            .setProtocols(request.protocols())
+            .setSessionTimeoutMs(request.sessionTimeoutMs())
+            .setRebalanceTimeoutMs(request.rebalanceTimeoutMs())
+            .setReason(request.reason());
+
+        GroupMetadataManagerTestContext.JoinResult secondJoinResult = 
context.sendClassicGroupJoin(
+            secondRequest,
+            true
+        );
+
+        ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(newMemberId)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(0)
+            .setState(MemberState.STABLE)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName))
+            .setRebalanceTimeoutMs(500)
+            .setAssignedPartitions(assignor.targetPartitions(newMemberId))
+            .setSupportedClassicProtocols(request.protocols())
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember),
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1, mkMapOfPartitionRacks(1)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId, 
assignor.targetPartitions(memberId)),
+            RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, 
assignor.targetPartitions(newMemberId)),
+
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+        );
+        assertRecordsEquals(expectedRecords.subList(0, 3), 
secondJoinResult.records.subList(0, 3));
+        assertUnorderedListEquals(expectedRecords.subList(3, 5), 
secondJoinResult.records.subList(3, 5));
+        assertRecordsEquals(expectedRecords.subList(5, 7), 
secondJoinResult.records.subList(5, 7));
+
+        assertTrue(secondJoinResult.joinFuture.isDone());
+        assertEquals(
+            new JoinGroupResponseData()
+                .setMemberId(newMemberId)
+                .setGenerationId(11)
+                .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                .setProtocolName("range"),
+            secondJoinResult.joinFuture.get()
+        );
+
+        context.assertSessionTimeout(groupId, newMemberId, 45000);
+        context.assertSyncTimeout(groupId, newMemberId, 
request.rebalanceTimeoutMs());
+    }
+
+    @Test
+    public void testConsumerGroupJoinWithNewStaticMember() throws Exception {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+        String instanceId = "instance-id";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 2)
+                .addTopic(barTopicId, barTopicName, 1)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
+                    {
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+                    }
+                })
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withGroupInstanceId(instanceId)
+            .withProtocols(GroupMetadataManagerTestContext.toRangeProtocol(
+                Arrays.asList(fooTopicName, barTopicName),
+                Collections.emptyList()))
+            .build();
+
+        GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(request);
+        String newMemberId = joinResult.joinFuture.get().memberId();
+
+        ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(newMemberId)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(0)
+            .setInstanceId(instanceId)
+            .setState(MemberState.STABLE)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName))
+            .setRebalanceTimeoutMs(500)
+            .setSupportedClassicProtocols(request.protocols())
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember),
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1, mkMapOfPartitionRacks(1)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId, 
Collections.emptyMap()),
+            RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, 
Collections.emptyMap()),
+
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+        );
+        assertRecordsEquals(expectedRecords.subList(0, 3), 
joinResult.records.subList(0, 3));
+        assertUnorderedListEquals(expectedRecords.subList(3, 5), 
joinResult.records.subList(3, 5));
+        assertRecordsEquals(expectedRecords.subList(5, 7), 
joinResult.records.subList(5, 7));
+
+        assertTrue(joinResult.joinFuture.isDone());
+        assertEquals(
+            new JoinGroupResponseData()
+                .setMemberId(newMemberId)
+                .setGenerationId(11)
+                .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                .setProtocolName("range"),
+            joinResult.joinFuture.get()
+        );
+
+        context.assertSessionTimeout(groupId, newMemberId, 45000);
+        context.assertSyncTimeout(groupId, newMemberId, 
request.rebalanceTimeoutMs());
+    }
+
+    @Test
+    public void testConsumerGroupJoinReplacingExistingStaticMember() throws 
Exception {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+        String instanceId = "instance-id";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 2)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
+                    {
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+                    }
+                })
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setInstanceId(instanceId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    
.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1)))
+                .withAssignmentEpoch(10))
+            .build();
+        
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
 10);
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withGroupInstanceId(instanceId)
+            .withProtocols(GroupMetadataManagerTestContext.toRangeProtocol(
+                Collections.singletonList(fooTopicName),
+                Collections.emptyList()))
+            .build();
+
+        // The static member joins with UNKNOWN_MEMBER_ID.
+        GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(
+            request,
+            true
+        );
+        String newMemberId = joinResult.joinFuture.get().memberId();
+
+        ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(newMemberId)
+            .setMemberEpoch(0)
+            .setPreviousMemberEpoch(0)
+            .setInstanceId(instanceId)
+            .setState(MemberState.UNREVOKED_PARTITIONS)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+            .setRebalanceTimeoutMs(500)
+            .setSupportedClassicProtocols(request.protocols())
+            .setPartitionsPendingRevocation(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            // Remove the old static member.
+            RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, 
memberId),
+            RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, 
memberId),
+            RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, 
memberId),
+
+            // Create the new static member.
+            RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember),
+            RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, 
Collections.emptyMap()),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+        );
+        assertRecordsEquals(expectedRecords, joinResult.records);
+
+        assertTrue(joinResult.joinFuture.isDone());
+        assertEquals(
+            new JoinGroupResponseData()
+                .setMemberId(newMemberId)
+                .setGenerationId(0)
+                .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                .setProtocolName("range"),
+            joinResult.joinFuture.get()
+        );
+
+        context.assertSessionTimeout(groupId, newMemberId, 45000);
+        context.assertSyncTimeout(groupId, newMemberId, 
request.rebalanceTimeoutMs());
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"", "instance-id"})
+    public void testConsumerGroupRejoinWithExistingMember(String instanceId) 
throws Exception {
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
+            GroupMetadataManagerTestContext.toRangeProtocol(
+                Arrays.asList(fooTopicName, barTopicName),
+                Arrays.asList(new TopicPartition(fooTopicName, 0), new 
TopicPartition(fooTopicName, 1))
+            );
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 2)
+                .addTopic(barTopicId, barTopicName, 1)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
+                    {
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 2, mkMapOfPartitionRacks(1)));
+                    }
+                })
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(instanceId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setRebalanceTimeoutMs(500)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSupportedClassicProtocols(protocols)
+                    .setSubscribedTopicNames(Arrays.asList(fooTopicName, 
barTopicName))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setRebalanceTimeoutMs(500)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList(fooTopicName, 
barTopicName))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(barTopicId, 0)))
+                .withAssignmentEpoch(10))
+            .build();
+        
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
 10);
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(memberId1)
+            .withGroupInstanceId(instanceId)
+            .withProtocols(protocols)
+            .build();
+
+        // The member rejoins with the same member id and protocols.
+        GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(request);
+        assertEquals(Collections.emptyList(), joinResult.records);
+        assertEquals(
+            new JoinGroupResponseData()
+                .setMemberId(memberId1)
+                .setGenerationId(10)
+                .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                .setProtocolName("range"),
+            joinResult.joinFuture.get()
+        );
+        context.assertSyncTimeout(groupId, memberId1, 
request.rebalanceTimeoutMs());
+    }
+
+    @Test
+    public void testConsumerGroupJoinStaticMemberWithUnknownInstanceId() 
throws Exception {
+        String groupId = "group-id";
+        String instanceId = "instance-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String fooTopicName = "foo";
+        String barTopicName = "bar";
+
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
+            GroupMetadataManagerTestContext.toRangeProtocol(
+                Arrays.asList(fooTopicName, barTopicName),
+                Arrays.asList(new TopicPartition(fooTopicName, 0), new 
TopicPartition(fooTopicName, 1))
+            );
+        // Set up a ConsumerGroup with no static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setSupportedClassicProtocols(protocols)
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .build()))
+            .build();
+
+        // The member joins with an instance id.
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(memberId1)
+            .withGroupInstanceId(instanceId)
+            .withProtocols(protocols)
+            .build();
+
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupJoin(request));
+    }
+
+    @Test
+    public void testConsumerGroupJoinStaticMemberWithUnmatchedMemberId() 
throws Exception {
+        String groupId = "group-id";
+        String instanceId = "instance-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String fooTopicName = "foo";
+        String barTopicName = "bar";
+
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
+            GroupMetadataManagerTestContext.toRangeProtocol(
+                Arrays.asList(fooTopicName, barTopicName),
+                Arrays.asList(new TopicPartition(fooTopicName, 0), new 
TopicPartition(fooTopicName, 1))
+            );
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(instanceId)
+                    .setSupportedClassicProtocols(protocols)
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .build()))
+            .build();
+
+        // The member joins with the same instance id and a different member 
id.
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(Uuid.randomUuid().toString())
+            .withGroupInstanceId(instanceId)
+            .withProtocols(protocols)
+            .build();
+
+        assertThrows(FencedInstanceIdException.class, () -> 
context.sendClassicGroupJoin(request));
+    }
+

Review Comment:
   I wonder if we should add a few more test cases to also validated the 
reconciliation part. Have we planned to do so? I can think of the following 
ones:
   * Test all the versions of the embedded consumer protocol;
   * Test with various owned partitions: empty list, incomplete list, etc.;
   * Test with the member is different states: Stable, Unrevoked partitions, 
Unreleased partitions, etc.
   * Test with updated subscriptions (should compute new assignment)



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,6 +1506,243 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handle a JoinGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The group to join.
+     * @param context        The request context.
+     * @param request        The actual JoinGroup request.
+     * @param responseFuture The join group response future.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<Record> records = new ArrayList<>();
+        final String groupId = request.groupId();
+        String memberId = request.memberId();
+        final String instanceId = request.groupInstanceId();
+        final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+        throwIfConsumerGroupIsFull(group, memberId);
+        throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+        // TODO: need to throw an exception if group is dead?
+
+        // Get or create the member.
+        if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        boolean newMemberCreated = false;
+        if (instanceId == null) {
+            // A dynamic member (re-)joins.
+            throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, 
context);
+            newMemberCreated = !group.hasMember(memberId);
+            member = group.getOrMaybeCreateMember(memberId, true);
+            log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+            updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+        } else {
+            member = group.staticMember(instanceId);
+            // A new static member joins or the existing static member rejoins.
+            if (isUnknownMember) {
+                newMemberCreated = true;
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, true);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Replace the current static member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    removeMember(records, groupId, member.memberId());
+                    log.info("[GroupId {}] Static member with unknown member 
id and instance id {} re-joins the consumer group. " +
+                        "Created a new member {} to replace the existing 
member {}.", groupId, instanceId, memberId, member.memberId());
+                }
+            } else {
+                // Rejoining static member. Fence the static group with 
unmatched member id.
+                throwIfStaticMemberIsUnknown(member, instanceId);
+                throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+                log.info("[GroupId {}] Static member {} with instance id {} 
re-joins the consumer group.", groupId, memberId, instanceId);
+            }
+        }
+
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
+        final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+        final List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions =
+            validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+        // 1. Create or update the member. If the member is new or has 
changed, a ConsumerGroupMemberMetadataValue
+        // record is written to the __consumer_offsets partition to persist 
the change. If the subscriptions have
+        // changed, the subscription metadata is updated and persisted by 
writing a ConsumerGroupPartitionMetadataValue
+        // record to the __consumer_offsets partition. Finally, the group 
epoch is bumped if the subscriptions have
+        // changed, and persisted by writing a ConsumerGroupMetadataValue 
record to the partition.
+        ConsumerGroupMember updatedMember = updatedMemberBuilder
+            .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+            .maybeUpdateRackId(subscription.rackId())
+            
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+            .maybeUpdateServerAssignorName(Optional.empty())
+            
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+            .setClientId(context.clientId())
+            .setClientHost(context.clientAddress.toString())
+            .setSupportedClassicProtocols(protocols)
+            .build();
+
+        boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, 
member, updatedMember, records);

Review Comment:
   nit: Could we format it as follow?
   
   ```
   boolean bumpGroupEpoch = updateMemberSubscription(
     groupId, 
     memberId, 
     member, 
     updatedMember, 
     records
   );
   
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2227,81 +2611,95 @@ public CoordinatorResult<Void, Record> classicGroupJoin(
         RequestContext context,
         JoinGroupRequestData request,
         CompletableFuture<JoinGroupResponseData> responseFuture
+    ) {
+        throwIfClassicGroupSessionTimeoutInvalid(request.sessionTimeoutMs());

Review Comment:
   Seeing this, I wonder if we should move all the static request validations 
to the group coordinator service. I does not make sense to occupy the 
coordinator threads for this. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to