dajac commented on code in PR #18635: URL: https://github.com/apache/kafka/pull/18635#discussion_r1922324922
########## core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala: ########## @@ -511,6 +511,80 @@ class MetadataCacheTest { assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet) } + @ParameterizedTest + @MethodSource(Array("cacheProvider")) + def testGetPartitionReplicaEndpoints(cache: MetadataCache): Unit = { + val securityProtocol = SecurityProtocol.PLAINTEXT + val listenerName = ListenerName.forSecurityProtocol(securityProtocol) + + // Set up broker data for the metadata cache + val numBrokers = 10 + val brokerRecords = (0 until numBrokers).map { brokerId => + new RegisterBrokerRecord() + .setBrokerId(brokerId) + .setFenced(false) + .setRack("rack" + (brokerId % 3)) + .setEndPoints(new BrokerEndpointCollection( + Seq(new BrokerEndpoint() + .setHost("foo" + brokerId) + .setPort(9092) + .setSecurityProtocol(securityProtocol.id) + .setName(listenerName.value) + ).iterator.asJava)) + } + + // Set up a single topic (with many partitions) for the metadata cache + val topic = "many-partitions-topic" + val topicId = Uuid.randomUuid() + val topicRecords = Seq[ApiMessage](new TopicRecord().setName(topic).setTopicId(topicId)) + + // Set up a number of partitions such that each different combination of + // $replicationFactor brokers is made a replica set for exactly one partition + val replicationFactor = 3 + val replicaSets = getAllReplicaSets(numBrokers, replicationFactor) + val numPartitions = replicaSets.length + val partitionRecords = (0 until numPartitions).map { partitionId => + val replicas = replicaSets(partitionId) + new PartitionRecord() + .setTopicId(topicId) + .setPartitionId(partitionId) + .setReplicas(replicas) + .setLeader(replicas.get(0)) + .setIsr(replicas) + .setEligibleLeaderReplicas(replicas) + } + + // Load the prepared data in the metadata cache + MetadataCacheTest.updateCache(cache, brokerRecords ++ topicRecords ++ partitionRecords) + + var seenReplicaSets: Set[scala.collection.Set[Int]] = Set.empty + (0 until numPartitions).foreach { partitionId => + val tp = new TopicPartition(topic, partitionId) + val brokerIdToNodeMap = cache.getPartitionReplicaEndpoints(tp, listenerName) + val replicaSet = brokerIdToNodeMap.keySet + // Verify that we haven't seen before the replica set for this partition + assertFalse(seenReplicaSets.contains(replicaSet), + s"Already seen replica set $replicaSet before partition $partitionId") + seenReplicaSets += replicaSet + // Verify that for each partition we have exactly $replicationFactor endpoints + assertEquals(replicationFactor, + replicaSet.size, + s"Unexpected replica set $replicaSet for partition $partitionId") Review Comment: Would it be possible to also verify that the method returns the replicas that we expect (replica ids + nodes)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org