dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603535727


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3931,6 +4071,74 @@ public CoordinatorResult<Void, CoordinatorRecord> 
classicGroupSync(
         return EMPTY_RESULT;
     }
 
+    /**
+     * Handle a SyncGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual SyncGroup request.
+     * @param responseFuture The sync group response future.
+     *
+     * @return The result that contains the appendFuture to return the 
response.
+     */
+    private CoordinatorResult<Void, CoordinatorRecord> 
classicGroupSyncToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException,
+        InconsistentGroupProtocolException, RebalanceInProgressException, 
IllegalStateException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        String instanceId = request.groupInstanceId();
+
+        ConsumerGroupMember member;
+        if (instanceId == null) {
+            member = group.getOrMaybeCreateMember(request.memberId(), false);
+        } else {
+            member = group.staticMember(instanceId);
+            if (member == null) {
+                throw new UnknownMemberIdException(
+                    String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+                );
+            }
+            throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+        }
+
+        throwIfMemberDoesNotUseClassicProtocol(member);
+        throwIfGenerationIdUnmatched(member.memberId(), member.memberEpoch(), 
request.generationId());
+        throwIfClassicProtocolUnmatched(member, request.protocolType(), 
request.protocolName());
+        throwIfRebalanceInProgress(group, member);
+
+        CompletableFuture<Void> appendFuture = new CompletableFuture<>();
+        appendFuture.whenComplete((__, t) -> {
+            if (t == null) {
+                cancelConsumerGroupSyncTimeout(groupId, memberId);
+                scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+                responseFuture.complete(new SyncGroupResponseData()
+                    .setProtocolType(request.protocolType())
+                    .setProtocolName(request.protocolName())
+                    .setAssignment(prepareAssignment(member)));
+            }
+        });
+
+        return new CoordinatorResult<>(Collections.emptyList(), appendFuture, 
false);
+    }
+
+    /**
+     * Serializes the member's assigned partitions with ConsumerProtocol.
+     *
+     * @param member The ConsumerGroupMember.
+     * @return The serialized assigned partitions.
+     */
+    private byte[] prepareAssignment(ConsumerGroupMember member) {

Review Comment:
   Now that we have this method, I wonder if we should inline the body of 
`deserializeProtocolVersion` here. I don't think that we will reuse 
deserializeProtocolVersion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to