dongnuo123 commented on code in PR #15268: URL: https://github.com/apache/kafka/pull/15268#discussion_r1470177662
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, List<Record> records) { } } + private JoinGroupRequestProtocol throwIfProtocolUnmatched( + ConsumerGroupMember member, + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols + ) { + for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : protocols) { + final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata()); + ConsumerProtocol.deserializeVersion(buffer); + final Optional<Integer> generationId = ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId(); + + // If the generation id is provided, it must match the member epoch. + if (!generationId.isPresent() || generationId.get() == member.memberEpoch()) { + // TODO: need a list of all available server assignors + if (UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name()) + || RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) { + return protocol; + } + } + } + throw new FencedMemberEpochException("The JoinGroup request doesn't have a matched generation id from a " + + "protocol supported by the server assignors with the epoch of the member known by the group coordinator (" + + member.memberEpoch() + ")."); + } + + private List<ConsumerGroupHeartbeatRequestData.TopicPartitions> transitionToConsumerGroupHeartbeatTopicPartitions( + List<TopicPartition> topicPartitions + ) { + Map<String, List<Integer>> topicMap = new HashMap<>(); + topicPartitions.forEach(tp -> + topicMap.computeIfAbsent(tp.topic(), __ -> new ArrayList<>()).add(tp.partition()) + ); + return topicMap.entrySet().stream().map(item -> { + TopicImage topicImage = metadataImage.topics().getTopic(item.getKey()); + if (topicImage == null) { + throw INVALID_TOPIC_EXCEPTION.exception("Can't find the topic id of topic " + item.getKey() + "."); + } + return new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(topicImage.id()) + .setPartitions(item.getValue()); + }).collect(Collectors.toList()); + } + + public CoordinatorResult<Void, Record> upgradeGroupJoin( + RequestContext context, + JoinGroupRequestData request, + CompletableFuture<JoinGroupResponseData> responseFuture + ) throws ApiException { + final long currentTimeMs = time.milliseconds(); + final List<Record> records = new ArrayList<>(); + final String groupId = request.groupId(); + String memberId = request.memberId(); + final String instanceId = request.groupInstanceId(); + final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); + final int sessionTimeoutMs = request.sessionTimeoutMs(); + + if (sessionTimeoutMs < classicGroupMinSessionTimeoutMs || + sessionTimeoutMs > classicGroupMaxSessionTimeoutMs + ) { + responseFuture.complete(new JoinGroupResponseData() + .setMemberId(memberId) + .setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) + ); + return EMPTY_RESULT; + } + + // Get or create the consumer group. + final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); + throwIfConsumerGroupIsFull(group, memberId); + + // Get or create the member. + if (isUnknownMember) memberId = Uuid.randomUuid().toString(); + ConsumerGroupMember member; + ConsumerGroupMember.Builder updatedMemberBuilder; + boolean staticMemberReplaced = false; + boolean newMemberCreated = false; + if (instanceId == null) { + // new dynamic member. + if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) { + // If member id required, send back a response to call for another join group request with allocated member id. + log.info("Dynamic member with unknown member id joins group {}. " + + "Created a new member id {} and requesting the member to rejoin with this id.", + group.groupId(), memberId); + + responseFuture.complete(new JoinGroupResponseData() + .setMemberId(memberId) + .setErrorCode(Errors.MEMBER_ID_REQUIRED.code()) + ); + return EMPTY_RESULT; + } else { + member = group.getOrMaybeCreateMember(memberId, true); + newMemberCreated = !group.members().containsKey(memberId); + log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + } + } else { + member = group.staticMember(instanceId); + // A new static member joins or the existing static member rejoins. + if (isUnknownMember) { + newMemberCreated = true; + if (member == null) { + // New static member. + member = group.getOrMaybeCreateMember(memberId, true); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} joins the upgrading consumer group.", groupId, memberId, instanceId); + } else { + // Replace the current member. + staticMemberReplaced = true; + updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) + .setAssignedPartitions(member.assignedPartitions()); + removeMemberAndCancelTimers(records, group.groupId(), member.memberId()); + log.info("[GroupId {}] Static member {} with instance id {} re-joins the upgrading consumer group.", groupId, memberId, instanceId); + } + } else { + // fence the static group with unmatched member id. + throwIfStaticMemberIsUnknown(member, instanceId); + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + } + } + + int groupEpoch = group.groupEpoch(); + Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata(); + + // 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue + // record is written to the __consumer_offsets partition to persist the change. If the subscriptions have + // changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue + // record to the __consumer_offsets partition. Finally, the group epoch is bumped if the subscriptions have + // changed, and persisted by writing a ConsumerGroupMetadataValue record to the partition. + final JoinGroupRequestProtocol protocol = throwIfProtocolUnmatched(member, request.protocols()); + final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata()); + ConsumerProtocol.deserializeVersion(buffer); + final ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(buffer, (short) 0); + final List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions = + transitionToConsumerGroupHeartbeatTopicPartitions(subscription.ownedPartitions()); + + ConsumerGroupMember updatedMember = updatedMemberBuilder + .maybeUpdateInstanceId(Optional.ofNullable(instanceId)) + .maybeUpdateRackId(subscription.rackId()) + .maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs())) + .maybeUpdateServerAssignorName(Optional.ofNullable(protocol.name())) + .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics())) + .setClientId(context.clientId()) + .setClientHost(context.clientAddress.toString()) + .build(); + + boolean bumpGroupEpoch = false; + if (!updatedMember.equals(member)) { + records.add(newMemberSubscriptionRecord(groupId, updatedMember)); + + if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) { + log.info("[GroupId {}] Member {} updated its subscribed topics to: {}.", + groupId, memberId, updatedMember.subscribedTopicNames()); + bumpGroupEpoch = true; + } + + if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) { + log.info("[GroupId {}] Member {} updated its subscribed regex to: {}.", + groupId, memberId, updatedMember.subscribedTopicRegex()); + bumpGroupEpoch = true; + } + } + + if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) { + // The subscription metadata is updated in two cases: + // 1) The member has updated its subscriptions; + // 2) The refresh deadline has been reached. + subscriptionMetadata = group.computeSubscriptionMetadata( + member, + updatedMember, + metadataImage.topics(), + metadataImage.cluster() + ); + + if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + log.info("[GroupId {}] Computed new subscription metadata: {}.", + groupId, subscriptionMetadata); + bumpGroupEpoch = true; + records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); + } + + if (bumpGroupEpoch) { + groupEpoch += 1; + records.add(newGroupEpochRecord(groupId, groupEpoch)); + log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); + metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); + } + + group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch); + } + + // 2. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member + // replaces an existing static member. The delta between the existing and the new target assignment is persisted to the partition. + int targetAssignmentEpoch = group.assignmentEpoch(); + Assignment targetAssignment = group.targetAssignment(memberId); + String preferredServerAssignor = null; + if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) { + preferredServerAssignor = group.computePreferredServerAssignor( + member, + updatedMember + ).orElse(defaultAssignor.name()); + try { + TargetAssignmentBuilder assignmentResultBuilder = + new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor)) + .withMembers(group.members()) + .withStaticMembers(group.staticMembers()) + .withSubscriptionMetadata(subscriptionMetadata) + .withTargetAssignment(group.targetAssignment()) + .addOrUpdateMember(memberId, updatedMember); + TargetAssignmentBuilder.TargetAssignmentResult assignmentResult; + // A new static member is replacing an older one with the same subscriptions. + // We just need to remove the older member and add the newer one. The new member should + // reuse the target assignment of the older member. + if (staticMemberReplaced) { + assignmentResult = assignmentResultBuilder + .removeMember(member.memberId()) + .build(); + } else { + assignmentResult = assignmentResultBuilder + .build(); + } + + log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor: {}.", + groupId, groupEpoch, preferredServerAssignor, assignmentResult.targetAssignment()); + + records.addAll(assignmentResult.records()); + targetAssignment = assignmentResult.targetAssignment().get(memberId); + targetAssignmentEpoch = groupEpoch; + } catch (PartitionAssignorException ex) { + String msg = String.format("Failed to compute a new target assignment for epoch %d: %s", + groupEpoch, ex.getMessage()); + log.error("[GroupId {}] {}.", groupId, msg); + throw new UnknownServerException(msg, ex); + } + } + + // 3. Reconcile the member's assignment with the target assignment. This is only required if + // the member is not stable or if a new target assignment has been installed. + if (updatedMember.state() != ConsumerGroupMember.MemberState.STABLE || updatedMember.targetMemberEpoch() != targetAssignmentEpoch) { + ConsumerGroupMember prevMember = updatedMember; + updatedMember = new CurrentAssignmentBuilder(updatedMember) + .withTargetAssignment(targetAssignmentEpoch, targetAssignment) + .withCurrentPartitionEpoch(group::currentPartitionEpoch) + .withOwnedTopicPartitions(ownedTopicPartitions) + .build(); + + // Checking the reference is enough here because a new instance + // is created only when the state has changed. + if (updatedMember != prevMember) { + records.add(newCurrentAssignmentRecord(groupId, updatedMember)); + + log.info("[GroupId {}] Member {} transitioned from {} to {}.", + groupId, memberId, member.currentAssignmentSummary(), updatedMember.currentAssignmentSummary()); + + if (updatedMember.state() == ConsumerGroupMember.MemberState.REVOKING) { + scheduleConsumerGroupRevocationTimeout( + groupId, + memberId, + updatedMember.rebalanceTimeoutMs(), + updatedMember.memberEpoch() + ); + } else { + cancelConsumerGroupRevocationTimeout(groupId, memberId); + } + } + } + + if (newMemberCreated) { + scheduleConsumerGroupSessionTimeout(groupId, memberId); + } + + responseFuture.complete(new JoinGroupResponseData() + .setMemberId(updatedMember.memberId()) + .setGenerationId(updatedMember.memberEpoch()) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName(preferredServerAssignor) + ); + + return new CoordinatorResult<>(records, null); + } + + private byte[] createSyncGroupResponseAssignment(Map<Uuid, Set<Integer>> assignedPartitions) { + final List<TopicPartition> partitions = new ArrayList<>(); + assignedPartitions.entrySet().forEach(item -> { + TopicImage topicImage = metadataImage.topics().getTopic(item.getKey()); + if (topicImage == null) { + throw INVALID_TOPIC_EXCEPTION.exception("Can't find the topic id of topic " + item.getKey() + "."); + } + partitions.addAll(item.getValue().stream().map(partition -> + new TopicPartition(topicImage.name(), partition)).collect(Collectors.toList())); + }); + + final ByteBuffer byteBuffer = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(partitions)); // TODO: assignment version? + byte[] res = new byte[byteBuffer.remaining()]; + byteBuffer.get(res); + return res; + } + + public CoordinatorResult<Void, Record> upgradeGroupSync( + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) { + String groupId = request.groupId(); + String memberId = request.memberId(); + ConsumerGroup group; + try { + group = getOrMaybeCreateConsumerGroup(groupId, false); + } catch (Throwable t) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.forException(t).code()) + ); + return EMPTY_RESULT; + } + + // TODO: + // 1) member id exists, instance id exists and matches member id + // 2) generation id and protocol match + // 3) group state is not empty or dead? + + ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + responseFuture.complete(new SyncGroupResponseData() + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName(group.preferredServerAssignor().orElse(null)) + .setAssignment(createSyncGroupResponseAssignment(member.assignedPartitions()))); + + return EMPTY_RESULT; + } + + public HeartbeatResponseData upgradeGroupHeartbeat( + RequestContext context, + HeartbeatRequestData request + ) { + final String groupId = request.groupId(); + final String memberId = request.memberId(); + + ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); + ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + + // TODO: + // 1) member id exists, instance id exists and matches member id + // 2) generation id and protocol match + // 3) group state is not empty or dead? + + scheduleConsumerGroupSessionTimeout(groupId, memberId); + + if (group.groupEpoch() > member.memberEpoch() || + !member.partitionsPendingRevocation().isEmpty() || + !member.partitionsPendingAssignment().isEmpty()) { + new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()); Review Comment: set join group timeout -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org