jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r735935021
########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -285,52 +268,57 @@ public FetchRequestData build() { if (nextMetadata.isFull()) { if (log.isDebugEnabled()) { log.debug("Built full fetch {} for node {} with {}.", - nextMetadata, node, partitionsToLogString(next.keySet())); + nextMetadata, node, topicPartitionsToLogString(next.keySet())); } sessionPartitions = next; next = null; + Map<TopicPartition, PartitionData> toSend = + Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); // Only add topic IDs to the session if we are using topic IDs. if (canUseTopicIds) { - sessionTopicIds = topicIds; - sessionTopicNames = new HashMap<>(topicIds.size()); - topicIds.forEach((name, id) -> sessionTopicNames.put(id, name)); + Map<Uuid, Set<String>> newTopicNames = sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry -> entry.getValue().topicId, + Collectors.mapping(entry -> entry.getKey().topic(), Collectors.toSet()))); + + sessionTopicNames = new HashMap<>(newTopicNames.size()); + // There should only be one topic name per topic ID. + newTopicNames.forEach((topicId, topicNamesSet) -> topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName))); } else { - sessionTopicIds = new HashMap<>(); sessionTopicNames = new HashMap<>(); } - topicIds = null; - Map<TopicPartition, PartitionData> toSend = - Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); - Map<String, Uuid> toSendTopicIds = - Collections.unmodifiableMap(new HashMap<>(sessionTopicIds)); - Map<Uuid, String> toSendTopicNames = - Collections.unmodifiableMap(new HashMap<>(sessionTopicNames)); - return new FetchRequestData(toSend, Collections.emptyList(), toSend, toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds); + return new FetchRequestData(toSend, Collections.emptyList(), Collections.emptyList(), toSend, nextMetadata, canUseTopicIds); } - List<TopicPartition> added = new ArrayList<>(); - List<TopicPartition> removed = new ArrayList<>(); - List<TopicPartition> altered = new ArrayList<>(); + List<TopicIdPartition> added = new ArrayList<>(); + List<TopicIdPartition> removed = new ArrayList<>(); + List<TopicIdPartition> altered = new ArrayList<>(); + List<TopicIdPartition> replaced = new ArrayList<>(); for (Iterator<Entry<TopicPartition, PartitionData>> iter = - sessionPartitions.entrySet().iterator(); iter.hasNext(); ) { + sessionPartitions.entrySet().iterator(); iter.hasNext(); ) { Entry<TopicPartition, PartitionData> entry = iter.next(); TopicPartition topicPartition = entry.getKey(); PartitionData prevData = entry.getValue(); PartitionData nextData = next.remove(topicPartition); if (nextData != null) { - if (!prevData.equals(nextData)) { + // We basically check if the new partition had the same topic ID. If not, + // we add it to the "replaced" set. + if (!prevData.topicId.equals(nextData.topicId) && !prevData.topicId.equals(Uuid.ZERO_UUID)) { Review comment: Do we not care to change IDs if the data is equal? We wouldn't usually send a request and I don't know if it is possible to even have the same data in such a case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org