dajac commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1580929744
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1413,6 +1506,243 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + /** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param context The request context. + * @param request The actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ + private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup( + ConsumerGroup group, + RequestContext context, + JoinGroupRequestData request, + CompletableFuture<JoinGroupResponseData> responseFuture + ) throws ApiException { + final long currentTimeMs = time.milliseconds(); + final List<Record> records = new ArrayList<>(); + final String groupId = request.groupId(); + String memberId = request.memberId(); + final String instanceId = request.groupInstanceId(); + final JoinGroupRequestProtocolCollection protocols = request.protocols(); + final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); + + throwIfConsumerGroupIsFull(group, memberId); + throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); + // TODO: need to throw an exception if group is dead? + + // Get or create the member. + if (isUnknownMember) memberId = Uuid.randomUuid().toString(); + ConsumerGroupMember member; + ConsumerGroupMember.Builder updatedMemberBuilder; + boolean staticMemberReplaced = false; + boolean newMemberCreated = false; + if (instanceId == null) { + // A dynamic member (re-)joins. + throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, context); + newMemberCreated = !group.hasMember(memberId); + member = group.getOrMaybeCreateMember(memberId, true); + log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + } else { + member = group.staticMember(instanceId); + // A new static member joins or the existing static member rejoins. + if (isUnknownMember) { + newMemberCreated = true; + if (member == null) { + // New static member. + member = group.getOrMaybeCreateMember(memberId, true); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId); + } else { + // Replace the current static member. + staticMemberReplaced = true; + updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) + .setAssignedPartitions(member.assignedPartitions()); + removeMember(records, groupId, member.memberId()); + log.info("[GroupId {}] Static member with unknown member id and instance id {} re-joins the consumer group. " + + "Created a new member {} to replace the existing member {}.", groupId, instanceId, memberId, member.memberId()); + } + } else { + // Rejoining static member. Fence the static group with unmatched member id. + throwIfStaticMemberIsUnknown(member, instanceId); + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); + } + } + + int groupEpoch = group.groupEpoch(); + Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata(); + final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); + final List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions = + validateGenerationIdAndGetOwnedPartition(member, subscription); + + // 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue + // record is written to the __consumer_offsets partition to persist the change. If the subscriptions have + // changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue + // record to the __consumer_offsets partition. Finally, the group epoch is bumped if the subscriptions have + // changed, and persisted by writing a ConsumerGroupMetadataValue record to the partition. + ConsumerGroupMember updatedMember = updatedMemberBuilder + .maybeUpdateInstanceId(Optional.ofNullable(instanceId)) + .maybeUpdateRackId(subscription.rackId()) + .maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs())) + .maybeUpdateServerAssignorName(Optional.empty()) + .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics())) + .setClientId(context.clientId()) + .setClientHost(context.clientAddress.toString()) + .setSupportedClassicProtocols(protocols) + .build(); + + boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, member, updatedMember, records); + if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) { + // The subscription metadata is updated in two cases: + // 1) The member has updated its subscriptions; + // 2) The refresh deadline has been reached. + subscriptionMetadata = group.computeSubscriptionMetadata( + member, + updatedMember, + metadataImage.topics(), + metadataImage.cluster() + ); + + if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + log.info("[GroupId {}] Computed new subscription metadata: {}.", + groupId, subscriptionMetadata); + bumpGroupEpoch = true; + records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); + } + + if (bumpGroupEpoch) { + groupEpoch += 1; + records.add(newGroupEpochRecord(groupId, groupEpoch)); + log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); + metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); + } + + group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch); + } + + // 2. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member + // replaces an existing static member. The delta between the existing and the new target assignment is persisted to the partition. + int targetAssignmentEpoch = group.assignmentEpoch(); + Assignment targetAssignment = group.targetAssignment(memberId); + if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) { + String preferredServerAssignor = group.computePreferredServerAssignor( + member, + updatedMember + ).orElse(defaultAssignor.name()); + try { + TargetAssignmentBuilder assignmentResultBuilder = + new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor)) + .withMembers(group.members()) + .withStaticMembers(group.staticMembers()) + .withSubscriptionMetadata(subscriptionMetadata) + .withTargetAssignment(group.targetAssignment()) + .addOrUpdateMember(memberId, updatedMember); + TargetAssignmentBuilder.TargetAssignmentResult assignmentResult; + // A new static member is replacing an older one with the same subscriptions. + // We just need to remove the older member and add the newer one. The new member should + // reuse the target assignment of the older member. + if (staticMemberReplaced) { + assignmentResult = assignmentResultBuilder + .removeMember(member.memberId()) + .build(); + } else { + assignmentResult = assignmentResultBuilder + .build(); + } + + log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor: {}.", + groupId, groupEpoch, preferredServerAssignor, assignmentResult.targetAssignment()); + + records.addAll(assignmentResult.records()); + targetAssignment = assignmentResult.targetAssignment().get(memberId); + targetAssignmentEpoch = groupEpoch; + } catch (PartitionAssignorException ex) { + String msg = String.format("Failed to compute a new target assignment for epoch %d: %s", + groupEpoch, ex.getMessage()); + log.error("[GroupId {}] {}.", groupId, msg); + throw new UnknownServerException(msg, ex); + } + } + + // 3. Reconcile the member's assignment with the target assignment if the member is not + // fully reconciled yet. + + /** + * TODO: + * joinGroup - sync timeout + * syncGroup - (join timeout) + * heartbeat - (join timeout) + * => scheduleConsumerGroupRebalanceTimeout is not necessary + */ + updatedMember = maybeReconcile( + groupId, + updatedMember, + group::currentPartitionEpoch, + targetAssignmentEpoch, + targetAssignment, + ownedTopicPartitions, + records + ); + + if (newMemberCreated) { + scheduleConsumerGroupSessionTimeout(groupId, memberId); + } + scheduleConsumerGroupSyncTimeout(groupId, memberId, request.rebalanceTimeoutMs()); + + responseFuture.complete(new JoinGroupResponseData() + .setMemberId(updatedMember.memberId()) + .setGenerationId(updatedMember.memberEpoch()) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName(protocols.iterator().next().name()) + ); + + return new CoordinatorResult<>(records); + } + + /** + * Creates the member subscription record if the updatedMember is different from + * the old member. Returns true if the subscribedTopicNames/subscribedTopicRegex + * has changed. + * + * @param groupId The group id. + * @param memberId The member id. + * @param member The old member. + * @param updatedMember The updated member. + * @param records The list to accumulate any new records. + * @return A boolean indicating whether the updatedMember has a different + * subscribedTopicNames/subscribedTopicRegex from the old member. + */ + private boolean updateMemberSubscription( + String groupId, + String memberId, Review Comment: nit: Could we get the member id from the updated member? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1413,6 +1506,243 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + /** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param context The request context. + * @param request The actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ + private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup( + ConsumerGroup group, + RequestContext context, + JoinGroupRequestData request, + CompletableFuture<JoinGroupResponseData> responseFuture + ) throws ApiException { + final long currentTimeMs = time.milliseconds(); + final List<Record> records = new ArrayList<>(); + final String groupId = request.groupId(); + String memberId = request.memberId(); + final String instanceId = request.groupInstanceId(); + final JoinGroupRequestProtocolCollection protocols = request.protocols(); + final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); + + throwIfConsumerGroupIsFull(group, memberId); + throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); + // TODO: need to throw an exception if group is dead? + + // Get or create the member. + if (isUnknownMember) memberId = Uuid.randomUuid().toString(); + ConsumerGroupMember member; + ConsumerGroupMember.Builder updatedMemberBuilder; + boolean staticMemberReplaced = false; + boolean newMemberCreated = false; + if (instanceId == null) { + // A dynamic member (re-)joins. + throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, context); + newMemberCreated = !group.hasMember(memberId); + member = group.getOrMaybeCreateMember(memberId, true); + log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + } else { + member = group.staticMember(instanceId); + // A new static member joins or the existing static member rejoins. + if (isUnknownMember) { + newMemberCreated = true; + if (member == null) { + // New static member. + member = group.getOrMaybeCreateMember(memberId, true); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId); + } else { + // Replace the current static member. + staticMemberReplaced = true; + updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) + .setAssignedPartitions(member.assignedPartitions()); + removeMember(records, groupId, member.memberId()); + log.info("[GroupId {}] Static member with unknown member id and instance id {} re-joins the consumer group. " + + "Created a new member {} to replace the existing member {}.", groupId, instanceId, memberId, member.memberId()); + } + } else { + // Rejoining static member. Fence the static group with unmatched member id. + throwIfStaticMemberIsUnknown(member, instanceId); + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); + } + } + + int groupEpoch = group.groupEpoch(); + Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata(); + final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); + final List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions = + validateGenerationIdAndGetOwnedPartition(member, subscription); + + // 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue + // record is written to the __consumer_offsets partition to persist the change. If the subscriptions have + // changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue + // record to the __consumer_offsets partition. Finally, the group epoch is bumped if the subscriptions have + // changed, and persisted by writing a ConsumerGroupMetadataValue record to the partition. + ConsumerGroupMember updatedMember = updatedMemberBuilder + .maybeUpdateInstanceId(Optional.ofNullable(instanceId)) + .maybeUpdateRackId(subscription.rackId()) + .maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs())) + .maybeUpdateServerAssignorName(Optional.empty()) + .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics())) + .setClientId(context.clientId()) + .setClientHost(context.clientAddress.toString()) + .setSupportedClassicProtocols(protocols) + .build(); + + boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, member, updatedMember, records); + if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) { + // The subscription metadata is updated in two cases: + // 1) The member has updated its subscriptions; + // 2) The refresh deadline has been reached. + subscriptionMetadata = group.computeSubscriptionMetadata( + member, + updatedMember, + metadataImage.topics(), + metadataImage.cluster() + ); + + if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + log.info("[GroupId {}] Computed new subscription metadata: {}.", + groupId, subscriptionMetadata); + bumpGroupEpoch = true; + records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); + } + + if (bumpGroupEpoch) { + groupEpoch += 1; + records.add(newGroupEpochRecord(groupId, groupEpoch)); + log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); + metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); + } + + group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch); + } + + // 2. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member + // replaces an existing static member. The delta between the existing and the new target assignment is persisted to the partition. + int targetAssignmentEpoch = group.assignmentEpoch(); + Assignment targetAssignment = group.targetAssignment(memberId); + if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) { + String preferredServerAssignor = group.computePreferredServerAssignor( + member, + updatedMember + ).orElse(defaultAssignor.name()); + try { + TargetAssignmentBuilder assignmentResultBuilder = + new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor)) + .withMembers(group.members()) + .withStaticMembers(group.staticMembers()) + .withSubscriptionMetadata(subscriptionMetadata) + .withTargetAssignment(group.targetAssignment()) + .addOrUpdateMember(memberId, updatedMember); + TargetAssignmentBuilder.TargetAssignmentResult assignmentResult; + // A new static member is replacing an older one with the same subscriptions. + // We just need to remove the older member and add the newer one. The new member should + // reuse the target assignment of the older member. + if (staticMemberReplaced) { + assignmentResult = assignmentResultBuilder + .removeMember(member.memberId()) + .build(); + } else { + assignmentResult = assignmentResultBuilder + .build(); + } + + log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor: {}.", + groupId, groupEpoch, preferredServerAssignor, assignmentResult.targetAssignment()); + + records.addAll(assignmentResult.records()); + targetAssignment = assignmentResult.targetAssignment().get(memberId); + targetAssignmentEpoch = groupEpoch; + } catch (PartitionAssignorException ex) { + String msg = String.format("Failed to compute a new target assignment for epoch %d: %s", + groupEpoch, ex.getMessage()); + log.error("[GroupId {}] {}.", groupId, msg); + throw new UnknownServerException(msg, ex); + } + } + + // 3. Reconcile the member's assignment with the target assignment if the member is not + // fully reconciled yet. + + /** + * TODO: + * joinGroup - sync timeout + * syncGroup - (join timeout) + * heartbeat - (join timeout) + * => scheduleConsumerGroupRebalanceTimeout is not necessary + */ + updatedMember = maybeReconcile( + groupId, + updatedMember, + group::currentPartitionEpoch, + targetAssignmentEpoch, + targetAssignment, + ownedTopicPartitions, + records + ); + + if (newMemberCreated) { + scheduleConsumerGroupSessionTimeout(groupId, memberId); + } Review Comment: To simplify, I wonder if we should just consider the join/sync group as a heartbeat signal too. What do you think? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1413,6 +1506,243 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + /** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param context The request context. + * @param request The actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ + private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup( + ConsumerGroup group, + RequestContext context, + JoinGroupRequestData request, + CompletableFuture<JoinGroupResponseData> responseFuture + ) throws ApiException { + final long currentTimeMs = time.milliseconds(); + final List<Record> records = new ArrayList<>(); + final String groupId = request.groupId(); + String memberId = request.memberId(); + final String instanceId = request.groupInstanceId(); + final JoinGroupRequestProtocolCollection protocols = request.protocols(); + final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); + + throwIfConsumerGroupIsFull(group, memberId); + throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); + // TODO: need to throw an exception if group is dead? + + // Get or create the member. + if (isUnknownMember) memberId = Uuid.randomUuid().toString(); + ConsumerGroupMember member; + ConsumerGroupMember.Builder updatedMemberBuilder; + boolean staticMemberReplaced = false; + boolean newMemberCreated = false; + if (instanceId == null) { + // A dynamic member (re-)joins. + throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, context); + newMemberCreated = !group.hasMember(memberId); + member = group.getOrMaybeCreateMember(memberId, true); + log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + } else { + member = group.staticMember(instanceId); + // A new static member joins or the existing static member rejoins. + if (isUnknownMember) { + newMemberCreated = true; + if (member == null) { + // New static member. + member = group.getOrMaybeCreateMember(memberId, true); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId); + } else { + // Replace the current static member. + staticMemberReplaced = true; + updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) + .setAssignedPartitions(member.assignedPartitions()); + removeMember(records, groupId, member.memberId()); + log.info("[GroupId {}] Static member with unknown member id and instance id {} re-joins the consumer group. " + + "Created a new member {} to replace the existing member {}.", groupId, instanceId, memberId, member.memberId()); + } + } else { + // Rejoining static member. Fence the static group with unmatched member id. + throwIfStaticMemberIsUnknown(member, instanceId); + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); + } + } + + int groupEpoch = group.groupEpoch(); + Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata(); + final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); + final List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions = + validateGenerationIdAndGetOwnedPartition(member, subscription); + + // 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue + // record is written to the __consumer_offsets partition to persist the change. If the subscriptions have + // changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue + // record to the __consumer_offsets partition. Finally, the group epoch is bumped if the subscriptions have + // changed, and persisted by writing a ConsumerGroupMetadataValue record to the partition. + ConsumerGroupMember updatedMember = updatedMemberBuilder + .maybeUpdateInstanceId(Optional.ofNullable(instanceId)) + .maybeUpdateRackId(subscription.rackId()) + .maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs())) + .maybeUpdateServerAssignorName(Optional.empty()) + .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics())) + .setClientId(context.clientId()) + .setClientHost(context.clientAddress.toString()) + .setSupportedClassicProtocols(protocols) + .build(); + + boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, member, updatedMember, records); + if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) { + // The subscription metadata is updated in two cases: + // 1) The member has updated its subscriptions; + // 2) The refresh deadline has been reached. + subscriptionMetadata = group.computeSubscriptionMetadata( + member, + updatedMember, + metadataImage.topics(), + metadataImage.cluster() + ); + + if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + log.info("[GroupId {}] Computed new subscription metadata: {}.", + groupId, subscriptionMetadata); + bumpGroupEpoch = true; + records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); + } + + if (bumpGroupEpoch) { + groupEpoch += 1; + records.add(newGroupEpochRecord(groupId, groupEpoch)); + log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); + metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); + } + + group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch); + } + + // 2. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member + // replaces an existing static member. The delta between the existing and the new target assignment is persisted to the partition. + int targetAssignmentEpoch = group.assignmentEpoch(); + Assignment targetAssignment = group.targetAssignment(memberId); + if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) { + String preferredServerAssignor = group.computePreferredServerAssignor( + member, + updatedMember + ).orElse(defaultAssignor.name()); + try { + TargetAssignmentBuilder assignmentResultBuilder = + new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor)) + .withMembers(group.members()) + .withStaticMembers(group.staticMembers()) + .withSubscriptionMetadata(subscriptionMetadata) + .withTargetAssignment(group.targetAssignment()) + .addOrUpdateMember(memberId, updatedMember); + TargetAssignmentBuilder.TargetAssignmentResult assignmentResult; + // A new static member is replacing an older one with the same subscriptions. + // We just need to remove the older member and add the newer one. The new member should + // reuse the target assignment of the older member. + if (staticMemberReplaced) { + assignmentResult = assignmentResultBuilder + .removeMember(member.memberId()) + .build(); + } else { + assignmentResult = assignmentResultBuilder + .build(); + } + + log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor: {}.", + groupId, groupEpoch, preferredServerAssignor, assignmentResult.targetAssignment()); + + records.addAll(assignmentResult.records()); + targetAssignment = assignmentResult.targetAssignment().get(memberId); + targetAssignmentEpoch = groupEpoch; + } catch (PartitionAssignorException ex) { + String msg = String.format("Failed to compute a new target assignment for epoch %d: %s", + groupEpoch, ex.getMessage()); + log.error("[GroupId {}] {}.", groupId, msg); + throw new UnknownServerException(msg, ex); + } + } + + // 3. Reconcile the member's assignment with the target assignment if the member is not + // fully reconciled yet. + + /** + * TODO: + * joinGroup - sync timeout + * syncGroup - (join timeout) + * heartbeat - (join timeout) + * => scheduleConsumerGroupRebalanceTimeout is not necessary + */ + updatedMember = maybeReconcile( + groupId, + updatedMember, + group::currentPartitionEpoch, + targetAssignmentEpoch, + targetAssignment, + ownedTopicPartitions, + records + ); + + if (newMemberCreated) { + scheduleConsumerGroupSessionTimeout(groupId, memberId); + } + scheduleConsumerGroupSyncTimeout(groupId, memberId, request.rebalanceTimeoutMs()); Review Comment: nit: Should we put a small comment to explain this one? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1169,6 +1175,107 @@ private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri } } + /** + * Validates if the received classic member protocols are supported by the group. + * + * @param group The ConsumerGroup. + * @param memberId The joining member id. + * @param protocolType The joining member protocol type. + * @param protocols The joining member protocol collection. + */ + private void throwIfClassicProtocolIsNotSupported( + ConsumerGroup group, + String memberId, + String protocolType, + JoinGroupRequestProtocolCollection protocols + ) { + if (!group.supportsClassicProtocols(protocolType, ClassicGroupMember.plainProtocolSet(protocols))) { Review Comment: I am not sure about the validation here. My concern is that the first classic member could join without any protocols based on the current validation. It seems that we only require it to be non empty when the group is not empty too. Should we also validate that `protocolType` is always `ConsumerProtocol.PROTOCOL_TYPE` and `protocols` is not empty? The logic in `supportsClassicProtocols` may not be 100% correct too. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1261,8 +1368,9 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr staticMemberReplaced = true; updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) .setAssignedPartitions(member.assignedPartitions()); - removeMemberAndCancelTimers(records, group.groupId(), member.memberId()); - log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); + removeMember(records, groupId, member.memberId()); Review Comment: nit: Could we put a comment here or in `removeMember` explaining why we don't cancel the timers? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1413,6 +1506,243 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + /** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param context The request context. + * @param request The actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ + private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup( + ConsumerGroup group, + RequestContext context, + JoinGroupRequestData request, + CompletableFuture<JoinGroupResponseData> responseFuture + ) throws ApiException { + final long currentTimeMs = time.milliseconds(); + final List<Record> records = new ArrayList<>(); + final String groupId = request.groupId(); + String memberId = request.memberId(); + final String instanceId = request.groupInstanceId(); + final JoinGroupRequestProtocolCollection protocols = request.protocols(); + final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); + + throwIfConsumerGroupIsFull(group, memberId); + throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); + // TODO: need to throw an exception if group is dead? + + // Get or create the member. + if (isUnknownMember) memberId = Uuid.randomUuid().toString(); + ConsumerGroupMember member; + ConsumerGroupMember.Builder updatedMemberBuilder; + boolean staticMemberReplaced = false; + boolean newMemberCreated = false; + if (instanceId == null) { + // A dynamic member (re-)joins. + throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, context); + newMemberCreated = !group.hasMember(memberId); + member = group.getOrMaybeCreateMember(memberId, true); + log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + } else { + member = group.staticMember(instanceId); + // A new static member joins or the existing static member rejoins. + if (isUnknownMember) { + newMemberCreated = true; + if (member == null) { + // New static member. + member = group.getOrMaybeCreateMember(memberId, true); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId); + } else { + // Replace the current static member. + staticMemberReplaced = true; + updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) + .setAssignedPartitions(member.assignedPartitions()); + removeMember(records, groupId, member.memberId()); + log.info("[GroupId {}] Static member with unknown member id and instance id {} re-joins the consumer group. " + + "Created a new member {} to replace the existing member {}.", groupId, instanceId, memberId, member.memberId()); + } + } else { + // Rejoining static member. Fence the static group with unmatched member id. + throwIfStaticMemberIsUnknown(member, instanceId); + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); + } + } + + int groupEpoch = group.groupEpoch(); + Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata(); + final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); + final List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions = + validateGenerationIdAndGetOwnedPartition(member, subscription); + + // 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue + // record is written to the __consumer_offsets partition to persist the change. If the subscriptions have + // changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue + // record to the __consumer_offsets partition. Finally, the group epoch is bumped if the subscriptions have + // changed, and persisted by writing a ConsumerGroupMetadataValue record to the partition. + ConsumerGroupMember updatedMember = updatedMemberBuilder + .maybeUpdateInstanceId(Optional.ofNullable(instanceId)) + .maybeUpdateRackId(subscription.rackId()) + .maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs())) + .maybeUpdateServerAssignorName(Optional.empty()) + .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics())) + .setClientId(context.clientId()) + .setClientHost(context.clientAddress.toString()) + .setSupportedClassicProtocols(protocols) + .build(); + + boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, member, updatedMember, records); + if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) { + // The subscription metadata is updated in two cases: + // 1) The member has updated its subscriptions; + // 2) The refresh deadline has been reached. + subscriptionMetadata = group.computeSubscriptionMetadata( + member, + updatedMember, + metadataImage.topics(), + metadataImage.cluster() + ); + + if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + log.info("[GroupId {}] Computed new subscription metadata: {}.", + groupId, subscriptionMetadata); + bumpGroupEpoch = true; + records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); + } + + if (bumpGroupEpoch) { + groupEpoch += 1; + records.add(newGroupEpochRecord(groupId, groupEpoch)); + log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); + metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); + } + + group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch); + } + + // 2. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member + // replaces an existing static member. The delta between the existing and the new target assignment is persisted to the partition. + int targetAssignmentEpoch = group.assignmentEpoch(); + Assignment targetAssignment = group.targetAssignment(memberId); + if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) { + String preferredServerAssignor = group.computePreferredServerAssignor( + member, + updatedMember + ).orElse(defaultAssignor.name()); + try { + TargetAssignmentBuilder assignmentResultBuilder = + new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor)) + .withMembers(group.members()) + .withStaticMembers(group.staticMembers()) + .withSubscriptionMetadata(subscriptionMetadata) + .withTargetAssignment(group.targetAssignment()) + .addOrUpdateMember(memberId, updatedMember); + TargetAssignmentBuilder.TargetAssignmentResult assignmentResult; + // A new static member is replacing an older one with the same subscriptions. + // We just need to remove the older member and add the newer one. The new member should + // reuse the target assignment of the older member. + if (staticMemberReplaced) { + assignmentResult = assignmentResultBuilder + .removeMember(member.memberId()) + .build(); + } else { + assignmentResult = assignmentResultBuilder + .build(); + } + + log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor: {}.", + groupId, groupEpoch, preferredServerAssignor, assignmentResult.targetAssignment()); + + records.addAll(assignmentResult.records()); + targetAssignment = assignmentResult.targetAssignment().get(memberId); + targetAssignmentEpoch = groupEpoch; + } catch (PartitionAssignorException ex) { + String msg = String.format("Failed to compute a new target assignment for epoch %d: %s", + groupEpoch, ex.getMessage()); + log.error("[GroupId {}] {}.", groupId, msg); + throw new UnknownServerException(msg, ex); + } + } + + // 3. Reconcile the member's assignment with the target assignment if the member is not + // fully reconciled yet. + + /** + * TODO: + * joinGroup - sync timeout + * syncGroup - (join timeout) + * heartbeat - (join timeout) + * => scheduleConsumerGroupRebalanceTimeout is not necessary + */ Review Comment: My understanding is that we need two timers: 1) One when we trigger the rebalance on the heartbeat to ensure that we receive a join group; 2) One when we receive the join group to ensure that we receive the sync group. Is my understanding correct? btw, we should remove the TODO from the code before we merge the PR. We usually don't keep TODOs. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1413,6 +1506,243 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + /** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param context The request context. + * @param request The actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ + private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup( + ConsumerGroup group, + RequestContext context, + JoinGroupRequestData request, + CompletableFuture<JoinGroupResponseData> responseFuture + ) throws ApiException { + final long currentTimeMs = time.milliseconds(); + final List<Record> records = new ArrayList<>(); + final String groupId = request.groupId(); + String memberId = request.memberId(); Review Comment: nit: I would put this non final one after the final ones. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2227,81 +2611,95 @@ public CoordinatorResult<Void, Record> classicGroupJoin( RequestContext context, JoinGroupRequestData request, CompletableFuture<JoinGroupResponseData> responseFuture + ) { + throwIfClassicGroupSessionTimeoutInvalid(request.sessionTimeoutMs()); + + Group group = groups.get(request.groupId(), Long.MAX_VALUE); + if (group != null && group.type() == CONSUMER && !group.isEmpty()) { Review Comment: The `!group.isEmpty()` condition is a bit subtile to grasp at first. My understanding is that we want empty consumer groups to be converted to classic groups. Is my understanding correct? If it is, it may be good to add a small comment to the `if` branch to explain it. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1413,6 +1506,243 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + /** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param context The request context. + * @param request The actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ + private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup( + ConsumerGroup group, + RequestContext context, + JoinGroupRequestData request, + CompletableFuture<JoinGroupResponseData> responseFuture + ) throws ApiException { + final long currentTimeMs = time.milliseconds(); + final List<Record> records = new ArrayList<>(); + final String groupId = request.groupId(); + String memberId = request.memberId(); + final String instanceId = request.groupInstanceId(); + final JoinGroupRequestProtocolCollection protocols = request.protocols(); + final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); + + throwIfConsumerGroupIsFull(group, memberId); + throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); + // TODO: need to throw an exception if group is dead? + + // Get or create the member. + if (isUnknownMember) memberId = Uuid.randomUuid().toString(); + ConsumerGroupMember member; + ConsumerGroupMember.Builder updatedMemberBuilder; + boolean staticMemberReplaced = false; + boolean newMemberCreated = false; + if (instanceId == null) { + // A dynamic member (re-)joins. + throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, context); + newMemberCreated = !group.hasMember(memberId); + member = group.getOrMaybeCreateMember(memberId, true); + log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + } else { + member = group.staticMember(instanceId); + // A new static member joins or the existing static member rejoins. + if (isUnknownMember) { + newMemberCreated = true; + if (member == null) { + // New static member. + member = group.getOrMaybeCreateMember(memberId, true); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId); + } else { + // Replace the current static member. + staticMemberReplaced = true; + updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) + .setAssignedPartitions(member.assignedPartitions()); + removeMember(records, groupId, member.memberId()); + log.info("[GroupId {}] Static member with unknown member id and instance id {} re-joins the consumer group. " + + "Created a new member {} to replace the existing member {}.", groupId, instanceId, memberId, member.memberId()); + } + } else { + // Rejoining static member. Fence the static group with unmatched member id. + throwIfStaticMemberIsUnknown(member, instanceId); + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); + } + } + + int groupEpoch = group.groupEpoch(); + Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata(); + final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); + final List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions = + validateGenerationIdAndGetOwnedPartition(member, subscription); + + // 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue + // record is written to the __consumer_offsets partition to persist the change. If the subscriptions have + // changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue + // record to the __consumer_offsets partition. Finally, the group epoch is bumped if the subscriptions have + // changed, and persisted by writing a ConsumerGroupMetadataValue record to the partition. + ConsumerGroupMember updatedMember = updatedMemberBuilder + .maybeUpdateInstanceId(Optional.ofNullable(instanceId)) + .maybeUpdateRackId(subscription.rackId()) + .maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs())) + .maybeUpdateServerAssignorName(Optional.empty()) + .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics())) + .setClientId(context.clientId()) + .setClientHost(context.clientAddress.toString()) + .setSupportedClassicProtocols(protocols) + .build(); + + boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, member, updatedMember, records); + if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) { + // The subscription metadata is updated in two cases: + // 1) The member has updated its subscriptions; + // 2) The refresh deadline has been reached. + subscriptionMetadata = group.computeSubscriptionMetadata( + member, + updatedMember, + metadataImage.topics(), + metadataImage.cluster() + ); + + if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + log.info("[GroupId {}] Computed new subscription metadata: {}.", + groupId, subscriptionMetadata); + bumpGroupEpoch = true; + records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); + } + + if (bumpGroupEpoch) { + groupEpoch += 1; + records.add(newGroupEpochRecord(groupId, groupEpoch)); + log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); + metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); + } + + group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch); + } + + // 2. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member + // replaces an existing static member. The delta between the existing and the new target assignment is persisted to the partition. + int targetAssignmentEpoch = group.assignmentEpoch(); Review Comment: I wonder if we could to the following in order to also reuse this code: ``` int targetAssignmentEpoch = group.assignmentEpoch(); Assignment targetAssignment = group.targetAssignment(memberId); if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) { targetAssignment = updateTargetAssignment(....); targetAssignmentEpoch = groupEpoch; } ``` It seems that it should work. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -10921,6 +10855,544 @@ public void testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() { assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); } + @Test + public void testConsumerGroupJoinThrowsExceptionIfGroupOverMaxSize() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build())) + .withConsumerGroupMaxSize(1) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> context.sendClassicGroupJoin(request)); + assertEquals("The consumer group has reached its maximum capacity of 1 members.", ex.getMessage()); + } + + @Test + public void testConsumerGroupJoinInvalidSessionTimeout() throws Exception { + int minSessionTimeout = 50; + int maxSessionTimeout = 100; + String groupId = "group-id"; + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withClassicGroupMinSessionTimeoutMs(minSessionTimeout) + .withClassicGroupMaxSessionTimeoutMs(maxSessionTimeout) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)) + .build(); + + JoinGroupRequestData requestWithSmallSessionTimeout = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withSessionTimeoutMs(minSessionTimeout - 1) + .build(); + assertThrows(InvalidSessionTimeoutException.class, () -> context.sendClassicGroupJoin(requestWithSmallSessionTimeout)); + + JoinGroupRequestData requestWithLargeSessionTimeout = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withSessionTimeoutMs(maxSessionTimeout + 1) + .build(); + assertThrows(InvalidSessionTimeoutException.class, () -> context.sendClassicGroupJoin(requestWithLargeSessionTimeout)); + } + + @Test + public void testConsumerGroupJoinThrowsExceptionIfProtocolIsNotSupported() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin")) + .build())) + .build(); + + JoinGroupRequestData requestWithEmptyProtocols = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithEmptyProtocols)); + + JoinGroupRequestData requestWithInvalidProtocolType = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("connect") + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithInvalidProtocolType)); + } + + @Test + public void testConsumerGroupJoinWithNewDynamicMember() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocols(GroupMetadataManagerTestContext.toRangeProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList())) + .build(); + assertThrows(MemberIdRequiredException.class, () -> context.sendClassicGroupJoin(request, true)); + + // Simulate getting the new member id from the error response. + String newMemberId = Uuid.randomUuid().toString(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(newMemberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0) + ))); + } + } + )); + + JoinGroupRequestData secondRequest = new JoinGroupRequestData() + .setGroupId(request.groupId()) + .setMemberId(newMemberId) + .setProtocolType(request.protocolType()) + .setProtocols(request.protocols()) + .setSessionTimeoutMs(request.sessionTimeoutMs()) + .setRebalanceTimeoutMs(request.rebalanceTimeoutMs()) + .setReason(request.reason()); + + GroupMetadataManagerTestContext.JoinResult secondJoinResult = context.sendClassicGroupJoin( + secondRequest, + true + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setAssignedPartitions(assignor.targetPartitions(newMemberId)) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, assignor.targetPartitions(memberId)), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, assignor.targetPartitions(newMemberId)), + + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), secondJoinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), secondJoinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), secondJoinResult.records.subList(5, 7)); + + assertTrue(secondJoinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + secondJoinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, 45000); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + + @Test + public void testConsumerGroupJoinWithNewStaticMember() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocols(GroupMetadataManagerTestContext.toRangeProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList())) + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + String newMemberId = joinResult.joinFuture.get().memberId(); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, Collections.emptyMap()), + + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), joinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), joinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), joinResult.records.subList(5, 7)); + + assertTrue(joinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, 45000); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + + @Test + public void testConsumerGroupJoinReplacingExistingStaticMember() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(Collections.singletonList(fooTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocols(GroupMetadataManagerTestContext.toRangeProtocol( + Collections.singletonList(fooTopicName), + Collections.emptyList())) + .build(); + + // The static member joins with UNKNOWN_MEMBER_ID. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin( + request, + true + ); + String newMemberId = joinResult.joinFuture.get().memberId(); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(0) + .setPreviousMemberEpoch(0) + .setInstanceId(instanceId) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Collections.singletonList(fooTopicName)) + .setRebalanceTimeoutMs(500) + .setSupportedClassicProtocols(request.protocols()) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build(); + + List<Record> expectedRecords = Arrays.asList( + // Remove the old static member. + RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId), + RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId), + RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId), + + // Create the new static member. + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords, joinResult.records); + + assertTrue(joinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(0) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, 45000); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + + @ParameterizedTest + @ValueSource(strings = {"", "instance-id"}) + public void testConsumerGroupRejoinWithExistingMember(String instanceId) throws Exception { + String groupId = "group-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + GroupMetadataManagerTestContext.toRangeProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)) + ); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 2, mkMapOfPartitionRacks(1))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSupportedClassicProtocols(protocols) + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(barTopicId, 0))) + .withAssignmentEpoch(10)) + .build(); + context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withGroupInstanceId(instanceId) + .withProtocols(protocols) + .build(); + + // The member rejoins with the same member id and protocols. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + assertEquals(Collections.emptyList(), joinResult.records); + assertEquals( + new JoinGroupResponseData() + .setMemberId(memberId1) + .setGenerationId(10) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + context.assertSyncTimeout(groupId, memberId1, request.rebalanceTimeoutMs()); + } + + @Test + public void testConsumerGroupJoinStaticMemberWithUnknownInstanceId() throws Exception { + String groupId = "group-id"; + String instanceId = "instance-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String fooTopicName = "foo"; + String barTopicName = "bar"; + + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + GroupMetadataManagerTestContext.toRangeProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)) + ); + // Set up a ConsumerGroup with no static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setSupportedClassicProtocols(protocols) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .build())) + .build(); + + // The member joins with an instance id. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withGroupInstanceId(instanceId) + .withProtocols(protocols) + .build(); + + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupJoin(request)); + } + + @Test + public void testConsumerGroupJoinStaticMemberWithUnmatchedMemberId() throws Exception { + String groupId = "group-id"; + String instanceId = "instance-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String fooTopicName = "foo"; + String barTopicName = "bar"; + + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + GroupMetadataManagerTestContext.toRangeProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)) + ); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setSupportedClassicProtocols(protocols) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .build())) + .build(); + + // The member joins with the same instance id and a different member id. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(Uuid.randomUuid().toString()) + .withGroupInstanceId(instanceId) + .withProtocols(protocols) + .build(); + + assertThrows(FencedInstanceIdException.class, () -> context.sendClassicGroupJoin(request)); + } + Review Comment: I wonder if we should add a few more test cases to also validated the reconciliation part. Have we planned to do so? I can think of the following ones: * Test all the versions of the embedded consumer protocol; * Test with various owned partitions: empty list, incomplete list, etc.; * Test with the member is different states: Stable, Unrevoked partitions, Unreleased partitions, etc. * Test with updated subscriptions (should compute new assignment) ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1413,6 +1506,243 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + /** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param context The request context. + * @param request The actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ + private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup( + ConsumerGroup group, + RequestContext context, + JoinGroupRequestData request, + CompletableFuture<JoinGroupResponseData> responseFuture + ) throws ApiException { + final long currentTimeMs = time.milliseconds(); + final List<Record> records = new ArrayList<>(); + final String groupId = request.groupId(); + String memberId = request.memberId(); + final String instanceId = request.groupInstanceId(); + final JoinGroupRequestProtocolCollection protocols = request.protocols(); + final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); + + throwIfConsumerGroupIsFull(group, memberId); + throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); + // TODO: need to throw an exception if group is dead? + + // Get or create the member. + if (isUnknownMember) memberId = Uuid.randomUuid().toString(); + ConsumerGroupMember member; + ConsumerGroupMember.Builder updatedMemberBuilder; + boolean staticMemberReplaced = false; + boolean newMemberCreated = false; + if (instanceId == null) { + // A dynamic member (re-)joins. + throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, context); + newMemberCreated = !group.hasMember(memberId); + member = group.getOrMaybeCreateMember(memberId, true); + log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + } else { + member = group.staticMember(instanceId); + // A new static member joins or the existing static member rejoins. + if (isUnknownMember) { + newMemberCreated = true; + if (member == null) { + // New static member. + member = group.getOrMaybeCreateMember(memberId, true); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId); + } else { + // Replace the current static member. + staticMemberReplaced = true; + updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) + .setAssignedPartitions(member.assignedPartitions()); + removeMember(records, groupId, member.memberId()); + log.info("[GroupId {}] Static member with unknown member id and instance id {} re-joins the consumer group. " + + "Created a new member {} to replace the existing member {}.", groupId, instanceId, memberId, member.memberId()); + } + } else { + // Rejoining static member. Fence the static group with unmatched member id. + throwIfStaticMemberIsUnknown(member, instanceId); + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); + } + } + + int groupEpoch = group.groupEpoch(); + Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata(); + final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); + final List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions = + validateGenerationIdAndGetOwnedPartition(member, subscription); + + // 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue + // record is written to the __consumer_offsets partition to persist the change. If the subscriptions have + // changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue + // record to the __consumer_offsets partition. Finally, the group epoch is bumped if the subscriptions have + // changed, and persisted by writing a ConsumerGroupMetadataValue record to the partition. + ConsumerGroupMember updatedMember = updatedMemberBuilder + .maybeUpdateInstanceId(Optional.ofNullable(instanceId)) + .maybeUpdateRackId(subscription.rackId()) + .maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs())) + .maybeUpdateServerAssignorName(Optional.empty()) + .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics())) + .setClientId(context.clientId()) + .setClientHost(context.clientAddress.toString()) + .setSupportedClassicProtocols(protocols) + .build(); + + boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, member, updatedMember, records); Review Comment: nit: Could we format it as follow? ``` boolean bumpGroupEpoch = updateMemberSubscription( groupId, memberId, member, updatedMember, records ); ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2227,81 +2611,95 @@ public CoordinatorResult<Void, Record> classicGroupJoin( RequestContext context, JoinGroupRequestData request, CompletableFuture<JoinGroupResponseData> responseFuture + ) { + throwIfClassicGroupSessionTimeoutInvalid(request.sessionTimeoutMs()); Review Comment: Seeing this, I wonder if we should move all the static request validations to the group coordinator service. I does not make sense to occupy the coordinator threads for this. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org