jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670562472



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
         assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
 
         // Perform another metadata update, but this time all topic metadata 
should be cleared.
         retainTopics.set(Collections.emptySet());
 
-        metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, 
newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300);
+        metadataResponse = 
RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, 
newTopicPartitionCounts, _tp -> 300, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+        topicIds.forEach((topicName, topicId) -> 
assertEquals(metadata.topicId(topicName), null));
 
         cluster = metadata.fetch();
         assertEquals(cluster.clusterResource().clusterId(), newClusterId);
         assertEquals(cluster.nodes().size(), newNodes);
         assertEquals(cluster.invalidTopics(), Collections.emptySet());
         assertEquals(cluster.unauthorizedTopics(), Collections.emptySet());
         assertEquals(cluster.topics(), Collections.emptySet());
+        assertTrue(cluster.topicIds().isEmpty());
+    }
+
+    @Test
+    public void testMetadataMergeOnIdDowngrade() {
+        Time time = new MockTime();
+        Map<String, Uuid> topicIds = new HashMap<>();
+
+        final AtomicReference<Set<String>> retainTopics = new 
AtomicReference<>(new HashSet<>());
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new 
LogContext(), new ClusterResourceListeners()) {
+            @Override
+            protected boolean retainTopic(String topic, boolean isInternal, 
long nowMs) {
+                return retainTopics.get().contains(topic);
+            }
+        };
+
+        // Initialize a metadata instance with two topics. Both will be 
retained.
+        String clusterId = "clusterId";
+        int nodes = 2;
+        Map<String, Integer> topicPartitionCounts = new HashMap<>();
+        topicPartitionCounts.put("validTopic1", 2);
+        topicPartitionCounts.put("validTopic2", 3);
+
+        retainTopics.set(new HashSet<>(Arrays.asList(
+                "validTopic1",
+                "validTopic2")));
+
+        topicIds.put("validTopic1", Uuid.randomUuid());
+        topicIds.put("validTopic2", Uuid.randomUuid());
+        MetadataResponse metadataResponse =
+                RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, 
Collections.emptyMap(), topicPartitionCounts, _tp -> 100, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+        retainTopics.get().forEach(topic -> 
assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        // Try removing the topic ID from keepValidTopic (simulating receiving 
a request from a controller with an older IBP)
+        topicIds.remove("validTopic1");
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(clusterId, 
nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 200, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+        retainTopics.get().forEach(topic -> 
assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        Cluster cluster = metadata.fetch();
+        // We still have the topic, but it just doesn't have an ID.
+        assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("validTopic1", "validTopic2")));

Review comment:
       I can switch it, but I was trying to keep the ordering consistent in the 
file. For example. The test above does the same thing:
   `assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));`
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to