dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1600311785
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync(
return EMPTY_RESULT;
}
+ /**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group The ConsumerGroup.
+ * @param context The request context.
+ * @param request The actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains records to append.
+ */
+ private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ SyncGroupRequestData request,
+ CompletableFuture<SyncGroupResponseData> responseFuture
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ String groupId = request.groupId();
+ String memberId = request.memberId();
+ String instanceId = request.groupInstanceId();
+
+ ConsumerGroupMember member;
+ if (instanceId == null) {
+ member = group.getOrMaybeCreateMember(request.memberId(), false);
+ } else {
+ member = group.staticMember(instanceId);
+ if (member == null) {
+ throw new UnknownMemberIdException(
+ String.format("Member with instance id %s is not a member
of group %s.", instanceId, groupId)
+ );
+ }
+ throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+ }
+
+ throwIfMemberDoesNotUseClassicProtocol(member);
+ throwIfGenerationIdOrProtocolUnmatched(
+ group,
+ member,
+ request.generationId(),
+ request.protocolType(),
+ request.protocolName()
+ );
+
+ cancelConsumerGroupSyncTimeout(groupId, memberId);
+// scheduleConsumerGroupSessionTimeout(groupId, memberId,
member.classicMemberSessionTimeout());
Review Comment:
Needs https://github.com/apache/kafka/pull/15921
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]