dongnuo123 commented on code in PR #15662: URL: https://github.com/apache/kafka/pull/15662#discussion_r1555022535
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java: ########## @@ -1300,6 +1341,68 @@ public Map<String, byte[]> groupAssignment() { )); } + /** + * Convert the current classic group to a consumer group. + * Add the records for the conversion. + * + * @param consumerGroup The converted consumer group. + * @param records The list to which the new records are added. + * + * @throws GroupIdNotFoundException if any of the group's member doesn't support the consumer protocol. + */ + public void convertToConsumerGroup( + ConsumerGroup consumerGroup, + List<Record> records, + TopicsImage topicsImage + ) throws GroupIdNotFoundException { + consumerGroup.setGroupEpoch(generationId); + consumerGroup.setTargetAssignmentEpoch(generationId); + + records.add(RecordHelpers.newGroupEpochRecord(groupId(), generationId)); + // SubscriptionMetadata will be computed in the following consumerGroupHeartbeat + records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), Collections.emptyMap())); + records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), generationId)); + + members.forEach((memberId, member) -> { + ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment())); + Map<Uuid, Set<Integer>> partitions = topicPartitionMapFromList(assignment.partitions(), topicsImage); + ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata())); Review Comment: >keep a reference to those in the member's state It was planned to be added in downgrade conversion but let me bring them into this pr because we will need it in implementing the apis -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org