jolshan commented on code in PR #15023: URL: https://github.com/apache/kafka/pull/15023#discussion_r1432814990
########## clients/src/test/java/org/apache/kafka/clients/MetadataTest.java: ########## @@ -1067,25 +1067,102 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) { assertEquals(Uuid.ZERO_UUID, cluster.topicId("validTopic1")); } + @Test + public void testTopicMetadataOnUpdatePartitionLeadership() { + String topic = "input-topic"; + Uuid topicId = Uuid.randomUuid(); + + Time time = new MockTime(); + + metadata = new Metadata( + refreshBackoffMs, + refreshBackoffMaxMs, + metadataExpireMs, + new LogContext(), + new ClusterResourceListeners()); + Node node1 = new Node(1, "localhost", 9091); + Node node2 = new Node(2, "localhost", 9091); + + TopicPartition tp0 = new TopicPartition(topic, 0); + MetadataResponse.PartitionMetadata partition0 = new MetadataResponse.PartitionMetadata( + Errors.NONE, + tp0, + Optional.of(1), + Optional.of(1), + Arrays.asList(1, 2), + Arrays.asList(1, 2), + Collections.emptyList() + ); + TopicPartition tp1 = new TopicPartition(topic, 1); + MetadataResponse.PartitionMetadata partition1 = + new MetadataResponse.PartitionMetadata( + Errors.NONE, + tp1, + Optional.of(1), + Optional.of(1), + Arrays.asList(1, 2), + Arrays.asList(1, 2), + Collections.emptyList() + ); + MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata( + Errors.NONE, + topic, + topicId, + false, + Arrays.asList(partition0, partition1), + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED + ); + + // Initialize metadata with two partitions + MetadataResponse response = RequestTestUtils.metadataResponse( + Arrays.asList(node1, node2), + "clusterId", + node1.id(), + Collections.singletonList(topicMetadata)); + metadata.updateWithCurrentRequestVersion( + response, + false, + time.milliseconds()); + assertEquals(2, metadata.fetch().partitionsForTopic(topic).size()); + assertEquals(1, metadata.fetch().partition(tp0).leader().id()); + assertEquals(1, metadata.fetch().partition(tp1).leader().id()); + + // "input-topic" partition 1 leader changes from node 1 to node 2 + metadata.updatePartitionLeadership( + Collections.singletonMap( + tp1, + new Metadata.LeaderIdAndEpoch( + Optional.of(2), + Optional.of(3) + )), + Arrays.asList(node1) + ); + assertEquals(2, metadata.fetch().partitionsForTopic(topic).size()); + assertEquals(1, metadata.fetch().partition(tp0).leader().id()); + assertEquals(2, metadata.fetch().partition(tp1).leader().id()); + } + @Test public void testUpdatePartitionLeadership() { Time time = new MockTime(); - // Setup metadata with initial set of 2 partitions, 1 each across topics, with 5 nodes. - // Also setup, 1 invalid topic, 1 unauthorized topic, 1 internal topic. + // Initialize metadata int numNodes = 5; metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()); ClusterResourceListener mockListener = Mockito.mock(ClusterResourceListener.class); metadata.addClusterUpdateListener(mockListener); - + // topic1 has 2 partitions: tp11, tp12 + // topic2 has 1 partition: tp21 String topic1 = "topic1"; - TopicPartition partition1 = new TopicPartition(topic1, 0); - PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, partition1, Optional.of(1), Optional.of(100), Arrays.asList(1, 2), Arrays.asList(1, 2), Arrays.asList(3)); + TopicPartition tp11 = new TopicPartition(topic1, 0); + PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, tp11, Optional.of(1), Optional.of(100), Arrays.asList(1, 2), Arrays.asList(1, 2), Arrays.asList(3)); Uuid topic1Id = Uuid.randomUuid(); + TopicPartition tp12 = new TopicPartition(topic1, 1); + PartitionMetadata part12Metadata = new PartitionMetadata(Errors.NONE, tp12, Optional.of(2), Optional.of(200), Arrays.asList(2, 3), Arrays.asList(2, 3), Arrays.asList(1)); Review Comment: nit: (we can address in a followup refactor) this naming is a bit confusing. Maybe all should be part11Metadata, part12Metadata, part21Metadata or even just tp11Metadata, tp12Metadata etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org