dimitarndimitrov commented on code in PR #18657: URL: https://github.com/apache/kafka/pull/18657#discussion_r1925019540
########## core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala: ########## @@ -645,6 +646,114 @@ class MetadataCacheTest { assertEquals(Seq(expectedNode1), partitionInfo.offlineReplicas.toSeq) } + @ParameterizedTest + @MethodSource(Array("cacheProvider")) + def testGetPartitionReplicaEndpoints(cache: MetadataCache): Unit = { + val securityProtocol = SecurityProtocol.PLAINTEXT + val listenerName = ListenerName.forSecurityProtocol(securityProtocol) + + // Set up broker data for the metadata cache + val numBrokers = 10 + // Set only the last broker in the list to be offline in order to allow easy + // indexing of brokers in the brokerStates list - the index in the list will + // be the same as the brokerId of the broker at that position. + val offlineBrokerId = numBrokers - 1 + val brokerStates = (0 until numBrokers - 1).map { brokerId => + new UpdateMetadataBroker() + .setId(brokerId) + .setRack("rack" + (brokerId % 3)) + .setEndpoints( + Seq(new UpdateMetadataEndpoint() + .setHost("foo" + brokerId) + .setPort(9092) + .setSecurityProtocol(securityProtocol.id) + .setListener(listenerName.value) + ).asJava) + } + + val topic = "many-partitions-topic" + val topicId = Uuid.randomUuid() + + // Set up a number of partitions such that each different combination of + // $replicationFactor brokers is made a replica set for exactly one partition + val replicationFactor = 3 + val replicaSets = getAllReplicaSets(numBrokers, replicationFactor) + val numPartitions = replicaSets.length + val partitionStates = (0 until numPartitions).map { partitionId => + val replicas = replicaSets(partitionId) + val onlineReplicas = replicas.stream().filter(id => id != offlineBrokerId).collect(Collectors.toList()) + new UpdateMetadataPartitionState() + .setTopicName(topic) + .setPartitionIndex(partitionId) + .setReplicas(replicas) + .setLeader(onlineReplicas.get(0)) + .setIsr(onlineReplicas) + .setOfflineReplicas(Collections.singletonList(offlineBrokerId)) + } + + // Load the prepared data in the metadata cache + val version = ApiKeys.UPDATE_METADATA.latestVersion + val controllerId = 0 + val controllerEpoch = 123 + val updateMetadataRequest = new UpdateMetadataRequest.Builder( + version, + controllerId, + controllerEpoch, + brokerEpoch, + partitionStates.asJava, + brokerStates.asJava, + Collections.singletonMap(topic, topicId)).build() + MetadataCacheTest.updateCache(cache, updateMetadataRequest) + + (0 until numPartitions).foreach { partitionId => + val tp = new TopicPartition(topic, partitionId) + val brokerIdToNodeMap = cache.getPartitionReplicaEndpoints(tp, listenerName) + val replicaSet = brokerIdToNodeMap.keySet + val expectedReplicaSet = partitionStates(partitionId).replicas().asScala.toSet + // Verify that we have endpoints for exactly the non-fenced brokers of the replica set + if (expectedReplicaSet.contains(offlineBrokerId)) { + assertEquals(expectedReplicaSet, + replicaSet + offlineBrokerId, + s"Unexpected partial replica set for partition $partitionId") Review Comment: nit: This condition should also have asserted that `replicaSet` doesn't contain `offlineBrokerId` - right now the existing `assertEquals` will pass even if that's not the case. Also applies to the original `trunk` change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org