dongnuo123 commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1555022535


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##########
@@ -1300,6 +1341,68 @@ public Map<String, byte[]> groupAssignment() {
         ));
     }
 
+    /**
+     * Convert the current classic group to a consumer group.
+     * Add the records for the conversion.
+     *
+     * @param consumerGroup     The converted consumer group.
+     * @param records           The list to which the new records are added.
+     *
+     * @throws GroupIdNotFoundException if any of the group's member doesn't 
support the consumer protocol.
+     */
+    public void convertToConsumerGroup(
+        ConsumerGroup consumerGroup,
+        List<Record> records,
+        TopicsImage topicsImage
+    ) throws GroupIdNotFoundException {
+        consumerGroup.setGroupEpoch(generationId);
+        consumerGroup.setTargetAssignmentEpoch(generationId);
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
generationId));
+        // SubscriptionMetadata will be computed in the following 
consumerGroupHeartbeat
+        
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), 
Collections.emptyMap()));
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
generationId));
+        
+        members.forEach((memberId, member) -> {
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata()));

Review Comment:
   >keep a reference to those in the member's state
   
   It was planned to be added in downgrade conversion but let me bring them 
into this pr because we will need it in implementing the apis



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to