msn-tldr commented on code in PR #15385:
URL: https://github.com/apache/kafka/pull/15385#discussion_r1502561721


##########
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##########
@@ -1260,6 +1265,103 @@ else if (partition.equals(internalPart))
         Mockito.reset(mockListener);
     }
 
+    /**
+     * Test that concurrently updating Metadata, and fetching the 
corresponding MetadataSnapshot & Cluster work as expected, i.e.
+     * snapshot & cluster contain the relevant updates.
+     */
+    @Test
+    public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws 
InterruptedException {
+        Time time = new MockTime();
+        metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, 
metadataExpireMs, new LogContext(), new ClusterResourceListeners());
+
+        // Setup metadata with 10 nodes, 2 topics, topic1 & 2, both to be 
retained in the update. Both will have leader-epoch 100.
+        int oldNodeCount = 10;
+        String topic1 = "test_topic1";
+        String topic2 = "test_topic2";
+        TopicPartition topic1Part0 = new TopicPartition(topic1, 0);
+        Map<String, Integer> topicPartitionCounts = new HashMap<>();
+        int oldPartitionCount = 1;
+        topicPartitionCounts.put(topic1, oldPartitionCount);
+        topicPartitionCounts.put(topic2, oldPartitionCount);
+        Map<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put(topic1, Uuid.randomUuid());
+        topicIds.put(topic2, Uuid.randomUuid());
+        int oldLeaderEpoch = 100;
+        MetadataResponse metadataResponse =
+            RequestTestUtils.metadataUpdateWithIds("cluster", oldNodeCount, 
Collections.emptyMap(), topicPartitionCounts, _tp -> oldLeaderEpoch, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+        MetadataSnapshot snapshot = metadata.fetchMetadataSnapshot();
+        Cluster cluster = metadata.fetch();
+        // Validate metadata snapshot & cluster are setup as expected.
+        assertEquals(cluster, snapshot.cluster());
+        assertEquals(oldNodeCount, snapshot.cluster().nodes().size());
+        assertEquals(oldPartitionCount, 
snapshot.cluster().partitionCountForTopic(topic1));
+        assertEquals(oldPartitionCount, 
snapshot.cluster().partitionCountForTopic(topic2));
+        assertEquals(OptionalInt.of(oldLeaderEpoch), 
snapshot.leaderEpochFor(topic1Part0));
+
+        // Setup 6 threads, where 3 are updating metadata & 3 are reading 
snapshot/cluster.
+        // Metadata will be updated with higher # of nodes, partition-counts, 
leader-epoch.
+        int numThreads = 6;
+        ExecutorService service = Executors.newFixedThreadPool(numThreads);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to