jolshan commented on a change in pull request #10952: URL: https://github.com/apache/kafka/pull/10952#discussion_r670562472
########## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ########## @@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) { assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic"))); assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2); assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4); + assertTrue(cluster.topicIds().containsAll(topicIds.values())); // Perform another metadata update, but this time all topic metadata should be cleared. retainTopics.set(Collections.emptySet()); - metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300); + metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300, topicIds); metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); + topicIds.forEach((topicName, topicId) -> assertEquals(metadata.topicId(topicName), null)); cluster = metadata.fetch(); assertEquals(cluster.clusterResource().clusterId(), newClusterId); assertEquals(cluster.nodes().size(), newNodes); assertEquals(cluster.invalidTopics(), Collections.emptySet()); assertEquals(cluster.unauthorizedTopics(), Collections.emptySet()); assertEquals(cluster.topics(), Collections.emptySet()); + assertTrue(cluster.topicIds().isEmpty()); + } + + @Test + public void testMetadataMergeOnIdDowngrade() { + Time time = new MockTime(); + Map<String, Uuid> topicIds = new HashMap<>(); + + final AtomicReference<Set<String>> retainTopics = new AtomicReference<>(new HashSet<>()); + metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()) { + @Override + protected boolean retainTopic(String topic, boolean isInternal, long nowMs) { + return retainTopics.get().contains(topic); + } + }; + + // Initialize a metadata instance with two topics. Both will be retained. + String clusterId = "clusterId"; + int nodes = 2; + Map<String, Integer> topicPartitionCounts = new HashMap<>(); + topicPartitionCounts.put("validTopic1", 2); + topicPartitionCounts.put("validTopic2", 3); + + retainTopics.set(new HashSet<>(Arrays.asList( + "validTopic1", + "validTopic2"))); + + topicIds.put("validTopic1", Uuid.randomUuid()); + topicIds.put("validTopic2", Uuid.randomUuid()); + MetadataResponse metadataResponse = + RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 100, topicIds); + metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); + retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic))); + + // Try removing the topic ID from keepValidTopic (simulating receiving a request from a controller with an older IBP) + topicIds.remove("validTopic1"); + metadataResponse = RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 200, topicIds); + metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); + retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic))); + + Cluster cluster = metadata.fetch(); + // We still have the topic, but it just doesn't have an ID. + assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("validTopic1", "validTopic2"))); Review comment: I can switch it, but I was trying to keep the ordering consistent in the file. For example. The test above does the same thing: `assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org