dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1568994033


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+                ByteBuffer.wrap(classicGroupMember.assignment())
+            );
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+                
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get()))
+            );
+
+            // The target assignment and the assigned partitions of each 
member are set based on the last
+            // assignment of the classic group. All the members are put in the 
Stable state. If the classic
+            // group was in Preparing Rebalance or Completing Rebalance 
states, the classic members are
+            // asked to rejoin the group to re-trigger a rebalance or collect 
their assignments.
+            ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+                .setMemberEpoch(classicGroup.generationId())
+                .setState(MemberState.STABLE)
+                .setPreviousMemberEpoch(classicGroup.generationId())
+                
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+                .setRackId(subscription.rackId().orElse(null))
+                .setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+                .setClientId(classicGroupMember.clientId())
+                .setClientHost(classicGroupMember.clientHost())
+                .setSubscribedTopicNames(subscription.topics())
+                .setAssignedPartitions(partitions)
+                
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+                .build();
+            consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+            consumerGroup.updateMember(newMember);
+        });
+
+        return consumerGroup;
+    }
+
+    /**
+     * Populate the record list with the records needed to create the given 
consumer group.
+     *
+     * @param records The list to which the new records are added.
+     */
+    public void createConsumerGroupRecords(
+        List<Record> records
+    ) {
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+        );
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+            records.add(RecordHelpers.newTargetAssignmentRecord(
+                groupId(),
+                consumerGroupMemberId,
+                targetAssignment(consumerGroupMemberId).partitions()
+            ))
+        );
+
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+        );
+    }
+
+    /**
+     * @return The map of topic id and partition set converted from the list 
of TopicPartition.
+     */
+    private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+        partitions.forEach(topicPartition -> {
+            TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+            if (topicImage != null) {
+                topicPartitionMap
+                    .computeIfAbsent(topicImage.id(), __ -> new HashSet<>())
+                    .add(topicPartition.partition());
+            }
+        });
+        return topicPartitionMap;
+    }
+
+    /**
+     * Create a corresponding ClassicGroup and append the record for the 
creation for group downgrade.
+     * The member with leavingMemberId will not be converted to the new 
ClassicGroup as it's the last
+     * member using new consumer protocol that left and triggered the 
downgrade.
+     *
+     * @param leavingMemberId               The member that will not be 
converted in the ClassicGroup.
+     * @param logContext                    The logContext to create the 
ClassicGroup.
+     * @param time                          The time to create the 
ClassicGroup.
+     * @param consumerGroupSessionTimeoutMs The consumerGroupSessionTimeoutMs.
+     * @param metadataImage                 The metadataImage.
+     * @param records                       The record list.
+     * @return  The created ClassicGroup.
+     */
+    public ClassicGroup toClassicGroup(
+        String leavingMemberId,
+        LogContext logContext,
+        Time time,
+        int consumerGroupSessionTimeoutMs,
+        MetadataImage metadataImage,
+        List<Record> records
+    ) {
+        ClassicGroup classicGroup = new ClassicGroup(
+            logContext,
+            groupId(),
+            ClassicGroupState.STABLE,
+            time,
+            metrics,
+            groupEpoch(),
+            Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE),
+            Optional.empty(),
+            members().keySet().stream().filter(member -> 
!member.equals(leavingMemberId)).findAny(),
+            Optional.of(time.milliseconds())
+        );
+
+        members().forEach((memberId, member) -> {
+            if (!memberId.equals(leavingMemberId)) {
+                classicGroup.add(
+                    new ClassicGroupMember(
+                        memberId,
+                        Optional.ofNullable(member.instanceId()),
+                        member.clientId(),
+                        member.clientHost(),
+                        member.rebalanceTimeoutMs(),
+                        consumerGroupSessionTimeoutMs,
+                        ConsumerProtocol.PROTOCOL_TYPE,
+                        member.supportedJoinGroupRequestProtocols(),

Review Comment:
   I'm not sure if I get it. The stored protocols should keep updating whenever 
the group receives the JoinGroupRequest during the mixed mode. So the protocol 
here is stored in the member's latest JoinGroupRequest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to