squah-confluent commented on code in PR #21558:
URL: https://github.com/apache/kafka/pull/21558#discussion_r2895484608
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -201,17 +236,75 @@ public static
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> toTopicPar
}
/**
- * Creates a map of topic id and partition set from a list of consumer
group TopicPartitions.
+ * Creates a map of topic id and partition with assignment epochs from a
list of consumer group TopicPartitions.
*
- * @param topicPartitionsList The list of TopicPartitions.
- * @return a map of topic id and partition set.
+ * @param topicPartitions The list of TopicPartitions.
+ * @param defaultEpoch The default epoch to use when the epoch information
is not available for a partition.
+ * @return a map of topic id and partitions with assignment epochs.
*/
- public static Map<Uuid, Set<Integer>> assignmentFromTopicPartitions(
- List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions>
topicPartitionsList
+ public static Map<Uuid, Map<Integer, Integer>>
assignmentFromTopicPartitions(
+ List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions>
topicPartitions,
+ int defaultEpoch
) {
- return topicPartitionsList.stream().collect(Collectors.toMap(
- ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions::topicId,
- topicPartitions -> Collections.unmodifiableSet(new
HashSet<>(topicPartitions.partitions()))));
+ // For legacy static member, the defaultEpoch could be -2
(LEAVE_GROUP_STATIC_MEMBER_EPOCH).
+ // But we want to ensure the default memberEpoch assigned is
non-negative.
+ int adjustedDefaultEpoch = Math.max(defaultEpoch, 0);
+ Map<Uuid, Map<Integer, Integer>> assignmentWithEpochs = new
HashMap<>();
+
+ for (ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions tp :
topicPartitions) {
+ Map<Integer, Integer> partitionEpochs = new HashMap<>();
+ List<Integer> partitions = tp.partitions();
+ List<Integer> epochs = tp.assignmentEpochs();
+
+ if (epochs != null && epochs.size() == partitions.size()) {
+ for (int i = 0; i < partitions.size(); i++) {
+ partitionEpochs.put(partitions.get(i), epochs.get(i));
+ }
+ } else {
+ if (epochs != null) {
+ log.error("Size of assignment epochs {} is not equal to
partitions {} for topic {}. " +
Review Comment:
Since we're using a static logger, the log messages won't include the
`[GroupCoordinator id=0] ` prefix. Also the group id is missing from the log.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]