jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r651332908
########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -186,23 +268,63 @@ public String toString() { * incremental fetch requests (see below). */ private LinkedHashMap<TopicPartition, PartitionData> next; + private Map<String, Uuid> topicIds; private final boolean copySessionPartitions; + private boolean missingTopicIds; Builder() { this.next = new LinkedHashMap<>(); + this.topicIds = new HashMap<>(); this.copySessionPartitions = true; } Builder(int initialSize, boolean copySessionPartitions) { this.next = new LinkedHashMap<>(initialSize); + this.topicIds = new HashMap<>(initialSize); this.copySessionPartitions = copySessionPartitions; } /** * Mark that we want data from this partition in the upcoming fetch. */ - public void add(TopicPartition topicPartition, PartitionData data) { + public void add(TopicPartition topicPartition, Uuid topicId, PartitionData data) { next.put(topicPartition, data); + // topicIds do not change between adding partitions and building, so we can use putIfAbsent + if (!topicId.equals(Uuid.ZERO_UUID)) { + topicIds.putIfAbsent(topicPartition.topic(), topicId); Review comment: Ah I see what you are saying here. I think this will still close the session when we send the request. The other option is to set a boolean similar to `missingTopicId` (maybe just change to `inconsistentTopicId` that signals to close the session earlier (upon build) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org