dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1600493799


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync(
         return EMPTY_RESULT;
     }
 
+    /**
+     * Handle a SyncGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual SyncGroup request.
+     * @param responseFuture The sync group response future.
+     *
+     * @return The result that contains records to append.
+     */
+    private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        String instanceId = request.groupInstanceId();
+
+        ConsumerGroupMember member;
+        if (instanceId == null) {
+            member = group.getOrMaybeCreateMember(request.memberId(), false);
+        } else {
+            member = group.staticMember(instanceId);
+            if (member == null) {
+                throw new UnknownMemberIdException(
+                    String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+                );
+            }
+            throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+        }
+
+        throwIfMemberDoesNotUseClassicProtocol(member);
+        throwIfGenerationIdOrProtocolUnmatched(
+            group,
+            member,
+            request.generationId(),
+            request.protocolType(),
+            request.protocolName()
+        );
+
+        cancelConsumerGroupSyncTimeout(groupId, memberId);
+//        scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicMemberSessionTimeout());

Review Comment:
   I just merged it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to