apalan60 commented on code in PR #20861:
URL: https://github.com/apache/kafka/pull/20861#discussion_r2519336228


##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -262,6 +285,374 @@ private void assertFoobarValue(KafkaClusterTestKit 
cluster, int expected) throws
         });
     }
 
+    @Test
+    public void testCreateClusterAndCreateListDeleteTopic() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(3)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+            TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+                "RaftManager was not initialized.");
+
+            String testTopic = "test-topic";
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                // Create a test topic
+                List<NewTopic> newTopic = List.of(new NewTopic(testTopic, 1, 
(short) 3));
+                CreateTopicsResult createTopicResult = 
admin.createTopics(newTopic);
+                createTopicResult.all().get();
+                waitForTopicListing(admin, List.of(testTopic), List.of());
+
+                // Delete topic
+                DeleteTopicsResult deleteResult = 
admin.deleteTopics(List.of(testTopic));
+                deleteResult.all().get();
+
+                // List again
+                waitForTopicListing(admin, List.of(), List.of(testTopic));
+            }
+        }
+    }
+
+    @Test
+    public void testCreateClusterAndCreateAndManyTopics() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(3)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+            TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+                "RaftManager was not initialized.");
+
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                // Create many topics
+                List<NewTopic> newTopics = List.of(
+                    new NewTopic("test-topic-1", 2, (short) 3),
+                    new NewTopic("test-topic-2", 2, (short) 3),
+                    new NewTopic("test-topic-3", 2, (short) 3)
+                );
+                CreateTopicsResult createTopicResult = 
admin.createTopics(newTopics);
+                createTopicResult.all().get();
+
+                // List created topics
+                waitForTopicListing(admin, List.of("test-topic-1", 
"test-topic-2", "test-topic-3"), List.of());
+            }
+        }
+    }
+
+    private Map<ClientQuotaEntity, Map<String, Double>> alterThenDescribe(
+        Admin admin,
+        ClientQuotaEntity entity,
+        List<ClientQuotaAlteration.Op> quotas,
+        ClientQuotaFilter filter,
+        int expectCount
+    ) throws Exception {
+        AlterClientQuotasResult alterResult = 
admin.alterClientQuotas(List.of(new ClientQuotaAlteration(entity, quotas)));
+        alterResult.all().get();
+
+        TestUtils.waitForCondition(() -> {
+            Map<ClientQuotaEntity, Map<String, Double>> results = 
admin.describeClientQuotas(filter).entities().get();
+            return results.getOrDefault(entity, Map.of()).size() == 
expectCount;
+        }, "Broker never saw new client quotas");
+
+        return admin.describeClientQuotas(filter).entities().get();
+    }
+
+    @Test
+    public void testClientQuotas() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                ClientQuotaEntity entity = new 
ClientQuotaEntity(Map.of("user", "testkit"));
+                ClientQuotaFilter filter = ClientQuotaFilter.containsOnly(
+                    List.of(ClientQuotaFilterComponent.ofEntity("user", 
"testkit")));
+
+                Map<ClientQuotaEntity, Map<String, Double>> describeResult = 
alterThenDescribe(admin, entity,
+                    List.of(new ClientQuotaAlteration.Op("request_percentage", 
0.99)), filter, 1);
+                assertEquals(0.99, 
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+                describeResult = alterThenDescribe(admin, entity, List.of(
+                    new ClientQuotaAlteration.Op("request_percentage", 0.97),
+                    new ClientQuotaAlteration.Op("producer_byte_rate", 
10000.0),
+                    new ClientQuotaAlteration.Op("consumer_byte_rate", 10001.0)
+                ), filter, 3);
+                assertEquals(0.97, 
describeResult.get(entity).get("request_percentage"), 1e-6);
+                assertEquals(10000.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+                assertEquals(10001.0, 
describeResult.get(entity).get("consumer_byte_rate"), 1e-6);
+
+                describeResult = alterThenDescribe(admin, entity, List.of(
+                    new ClientQuotaAlteration.Op("request_percentage", 0.95),
+                    new ClientQuotaAlteration.Op("producer_byte_rate", null),
+                    new ClientQuotaAlteration.Op("consumer_byte_rate", null)
+                ), filter, 1);
+                assertEquals(0.95, 
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+                alterThenDescribe(admin, entity, List.of(
+                    new ClientQuotaAlteration.Op("request_percentage", null)), 
filter, 0);
+
+                describeResult = alterThenDescribe(admin, entity,
+                    List.of(new ClientQuotaAlteration.Op("producer_byte_rate", 
9999.0)), filter, 1);
+                assertEquals(9999.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+
+                ClientQuotaEntity entity2 = new 
ClientQuotaEntity(Map.of("user", "testkit", "client-id", "some-client"));
+                filter = ClientQuotaFilter.containsOnly(
+                    List.of(
+                        ClientQuotaFilterComponent.ofEntity("user", "testkit"),
+                        ClientQuotaFilterComponent.ofEntity("client-id", 
"some-client")
+                    ));
+                describeResult = alterThenDescribe(admin, entity2,
+                    List.of(new ClientQuotaAlteration.Op("producer_byte_rate", 
9998.0)), filter, 1);
+                assertEquals(9998.0, 
describeResult.get(entity2).get("producer_byte_rate"), 1e-6);
+
+                final ClientQuotaFilter finalFilter = 
ClientQuotaFilter.contains(
+                    List.of(ClientQuotaFilterComponent.ofEntity("user", 
"testkit")));
+
+                TestUtils.waitForCondition(() -> {
+                    Map<ClientQuotaEntity, Map<String, Double>> results = 
admin.describeClientQuotas(finalFilter).entities().get();
+                    if (results.size() != 2) {
+                        return false;
+                    }
+                    assertEquals(9999.0, 
results.get(entity).get("producer_byte_rate"), 1e-6);
+                    assertEquals(9998.0, 
results.get(entity2).get("producer_byte_rate"), 1e-6);
+                    return true;
+                }, "Broker did not see two client quotas");
+            }
+        }
+    }
+
+    private void setConsumerByteRate(Admin admin, ClientQuotaEntity entity, 
Long value) throws Exception {
+        admin.alterClientQuotas(List.of(
+            new ClientQuotaAlteration(entity, List.of(
+                new ClientQuotaAlteration.Op("consumer_byte_rate", 
value.doubleValue())))
+        )).all().get();
+    }
+
+    private Map<ClientQuotaEntity, Long> getConsumerByteRates(Admin admin) 
throws Exception {
+        return 
admin.describeClientQuotas(ClientQuotaFilter.contains(List.of()))
+            .entities().get()
+            .entrySet().stream()
+            .filter(entry -> 
entry.getValue().containsKey("consumer_byte_rate"))
+            .collect(Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> entry.getValue().get("consumer_byte_rate").longValue()
+            ));
+    }
+
+    @Test
+    public void testDefaultClientQuotas() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                ClientQuotaEntity defaultUser = new 
ClientQuotaEntity(Collections.singletonMap("user", null));
+                ClientQuotaEntity bobUser = new 
ClientQuotaEntity(Map.of("user", "bob"));
+
+                TestUtils.waitForCondition(
+                    () -> getConsumerByteRates(admin).isEmpty(),
+                    "Initial consumer byte rates should be empty");
+
+                setConsumerByteRate(admin, defaultUser, 100L);
+                TestUtils.waitForCondition(() -> {
+                    Map<ClientQuotaEntity, Long> rates = 
getConsumerByteRates(admin);
+                    return rates.size() == 1 &&
+                        rates.get(defaultUser) == 100L;
+                }, "Default user rate should be 100");
+
+                setConsumerByteRate(admin, bobUser, 1000L);
+                TestUtils.waitForCondition(() -> {
+                    Map<ClientQuotaEntity, Long> rates = 
getConsumerByteRates(admin);
+                    return rates.size() == 2 &&
+                        rates.get(defaultUser) == 100L &&
+                        rates.get(bobUser) == 1000L;
+                }, "Should have both default and bob user rates");
+            }
+        }
+    }
+
+    @Test
+    public void testCreateClusterWithAdvertisedPortZero() throws Exception {
+        Map<Integer, Map<String, String>> brokerPropertyOverrides = new 
HashMap<>();
+        for (int brokerId = 0; brokerId < 3; brokerId++) {
+            Map<String, String> props = new HashMap<>();
+            props.put(SocketServerConfigs.LISTENERS_CONFIG, 
"EXTERNAL://localhost:0");
+            props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"EXTERNAL://localhost:0");
+            brokerPropertyOverrides.put(brokerId, props);
+        }
+
+        TestKitNodes nodes = new TestKitNodes.Builder()
+            .setNumControllerNodes(1)
+            .setNumBrokerNodes(3)
+            .setPerServerProperties(brokerPropertyOverrides)
+            .build();
+
+        doOnStartedKafkaCluster(nodes, cluster ->
+            
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
 Duration.ofSeconds(15), cluster)
+                .nodes().values().forEach(broker -> {
+                    assertEquals("localhost", broker.host(),
+                        "Did not advertise configured advertised host");
+                    
assertEquals(cluster.brokers().get(broker.id()).socketServer().boundPort(cluster.nodes().brokerListenerName()),
 broker.port(),
+                        "Did not advertise bound socket port");
+                }));
+    }
+
+    @Test
+    public void 
testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer() throws 
Exception {
+        Map<Integer, Map<String, String>> brokerPropertyOverrides = new 
HashMap<>();
+        for (int brokerId = 0; brokerId < 3; brokerId++) {
+            Map<String, String> props = new HashMap<>();
+            props.put(SocketServerConfigs.LISTENERS_CONFIG, 
"EXTERNAL://localhost:0");
+            props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
+                "EXTERNAL://advertised-host-" + brokerId + ":" + (brokerId + 
100));
+            brokerPropertyOverrides.put(brokerId, props);
+        }
+
+        TestKitNodes nodes = new TestKitNodes.Builder()
+            .setNumControllerNodes(1)
+            .setNumBrokerNodes(3)
+            .setNumDisksPerBroker(1)
+            .setPerServerProperties(brokerPropertyOverrides)
+            .build();
+
+        doOnStartedKafkaCluster(nodes, cluster ->
+            
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
 Duration.ofSeconds(15), cluster)
+                .nodes().values().forEach(broker -> {
+                    assertEquals("advertised-host-" + broker.id(), 
broker.host(), "Did not advertise configured advertised host");
+                    assertEquals(broker.id() + 100, broker.port(), "Did not 
advertise configured advertised port");
+                }));
+    }
+
+    private void doOnStartedKafkaCluster(TestKitNodes nodes, 
ThrowingConsumer<KafkaClusterTestKit> action) throws Exception {
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).build()) {
+            cluster.format();
+            cluster.startup();
+            action.accept(cluster);
+        }
+    }
+
+    private DescribeClusterResponse 
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(
+        ListenerName listenerName,
+        Duration waitTime,
+        KafkaClusterTestKit cluster
+    ) throws Exception {
+        long startTime = System.currentTimeMillis();
+        TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+            "Broker never made it to RUNNING state.");
+        TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+            "RaftManager was not initialized.");
+
+        Duration remainingWaitTime = 
waitTime.minus(Duration.ofMillis(System.currentTimeMillis() - startTime));
+
+        final DescribeClusterResponse[] currentResponse = new 
DescribeClusterResponse[1];
+        int expectedBrokerCount = cluster.nodes().brokerNodes().size();
+        TestUtils.waitForCondition(
+            () -> {
+                currentResponse[0] = connectAndReceive(
+                    new DescribeClusterRequest.Builder(new 
DescribeClusterRequestData()).build(),
+                    
cluster.brokers().get(0).socketServer().boundPort(listenerName)
+                );
+                return currentResponse[0].nodes().size() == 
expectedBrokerCount;
+            },
+            remainingWaitTime.toMillis(),
+            String.format("After %s ms Broker is only aware of %s brokers, but 
%s are expected", remainingWaitTime.toMillis(), expectedBrokerCount, 
expectedBrokerCount)
+        );
+
+        return currentResponse[0];
+    }
+
+    @FunctionalInterface
+    private interface ThrowingConsumer<T> {
+        void accept(T t) throws Exception;
+    }
+
+    private void waitForTopicListing(Admin admin, List<String> 
expectedPresent, List<String> expectedAbsent)
+        throws InterruptedException {
+        Set<String> topicsNotFound = new HashSet<>(expectedPresent);
+        Set<String> extraTopics = new HashSet<>();
+        TestUtils.waitForCondition(() -> {
+            Set<String> topicNames = admin.listTopics().names().get();
+            topicsNotFound.removeAll(topicNames);
+            
extraTopics.addAll(topicNames.stream().filter(expectedAbsent::contains).collect(Collectors.toSet()));

Review Comment:
   In the [Scala 
version](https://github.com/apache/kafka/blob/274233f39da16190396357ef21362e310b5f73f9/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala#L194C7-L194C18)
 , `extraTopics` is recomputed on every poll.
   ```scala
   extraTopics = 
admin.listTopics().names().get().asScala.filter(expectedAbsent.contains(_))
   ```
   
   In the current Java version, `extraTopics` is accumulated and never cleared. 
This means that once a topic from expectedAbsent appears in an early poll, its 
name is retained even if it disappears later.
   
   Just wondering if this difference was intentional, or if we should preserve 
the original behavior?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to