jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r735935021



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -285,52 +268,57 @@ public FetchRequestData build() {
             if (nextMetadata.isFull()) {
                 if (log.isDebugEnabled()) {
                     log.debug("Built full fetch {} for node {} with {}.",
-                              nextMetadata, node, 
partitionsToLogString(next.keySet()));
+                            nextMetadata, node, 
topicPartitionsToLogString(next.keySet()));
                 }
                 sessionPartitions = next;
                 next = null;
+                Map<TopicPartition, PartitionData> toSend =
+                        Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
                 // Only add topic IDs to the session if we are using topic IDs.
                 if (canUseTopicIds) {
-                    sessionTopicIds = topicIds;
-                    sessionTopicNames = new HashMap<>(topicIds.size());
-                    topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
+                    Map<Uuid, Set<String>> newTopicNames = 
sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry
 -> entry.getValue().topicId,
+                            Collectors.mapping(entry -> 
entry.getKey().topic(), Collectors.toSet())));
+
+                    sessionTopicNames = new HashMap<>(newTopicNames.size());
+                    // There should only be one topic name per topic ID.
+                    newTopicNames.forEach((topicId, topicNamesSet) -> 
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
                 } else {
-                    sessionTopicIds = new HashMap<>();
                     sessionTopicNames = new HashMap<>();
                 }
-                topicIds = null;
-                Map<TopicPartition, PartitionData> toSend =
-                    Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
-                Map<String, Uuid> toSendTopicIds =
-                    Collections.unmodifiableMap(new 
HashMap<>(sessionTopicIds));
-                Map<Uuid, String> toSendTopicNames =
-                    Collections.unmodifiableMap(new 
HashMap<>(sessionTopicNames));
-                return new FetchRequestData(toSend, Collections.emptyList(), 
toSend, toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds);
+                return new FetchRequestData(toSend, Collections.emptyList(), 
Collections.emptyList(), toSend, nextMetadata, canUseTopicIds);
             }
 
-            List<TopicPartition> added = new ArrayList<>();
-            List<TopicPartition> removed = new ArrayList<>();
-            List<TopicPartition> altered = new ArrayList<>();
+            List<TopicIdPartition> added = new ArrayList<>();
+            List<TopicIdPartition> removed = new ArrayList<>();
+            List<TopicIdPartition> altered = new ArrayList<>();
+            List<TopicIdPartition> replaced = new ArrayList<>();
             for (Iterator<Entry<TopicPartition, PartitionData>> iter =
-                     sessionPartitions.entrySet().iterator(); iter.hasNext(); 
) {
+                 sessionPartitions.entrySet().iterator(); iter.hasNext(); ) {
                 Entry<TopicPartition, PartitionData> entry = iter.next();
                 TopicPartition topicPartition = entry.getKey();
                 PartitionData prevData = entry.getValue();
                 PartitionData nextData = next.remove(topicPartition);
                 if (nextData != null) {
-                    if (!prevData.equals(nextData)) {
+                    // We basically check if the new partition had the same 
topic ID. If not,
+                    // we add it to the "replaced" set.
+                    if (!prevData.topicId.equals(nextData.topicId) && 
!prevData.topicId.equals(Uuid.ZERO_UUID)) {

Review comment:
       Do we not care to change IDs if the data is equal? We wouldn't usually 
send a request and I don't know if it is possible to even have the same data in 
such a case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to