jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r651332118



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -186,23 +268,63 @@ public String toString() {
          * incremental fetch requests (see below).
          */
         private LinkedHashMap<TopicPartition, PartitionData> next;
+        private Map<String, Uuid> topicIds;
         private final boolean copySessionPartitions;
+        private boolean missingTopicIds;
 
         Builder() {
             this.next = new LinkedHashMap<>();
+            this.topicIds = new HashMap<>();
             this.copySessionPartitions = true;
         }
 
         Builder(int initialSize, boolean copySessionPartitions) {
             this.next = new LinkedHashMap<>(initialSize);
+            this.topicIds = new HashMap<>(initialSize);
             this.copySessionPartitions = copySessionPartitions;
         }
 
         /**
          * Mark that we want data from this partition in the upcoming fetch.
          */
-        public void add(TopicPartition topicPartition, PartitionData data) {
+        public void add(TopicPartition topicPartition, Uuid topicId, 
PartitionData data) {
             next.put(topicPartition, data);
+            // topicIds do not change between adding partitions and building, 
so we can use putIfAbsent
+            if (!topicId.equals(Uuid.ZERO_UUID)) {
+                topicIds.putIfAbsent(topicPartition.topic(), topicId);

Review comment:
       If we try to put in a new topic ID, the session should be closed. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to