chia7712 commented on code in PR #20861:
URL: https://github.com/apache/kafka/pull/20861#discussion_r2609538241


##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -262,6 +285,375 @@ private void assertFoobarValue(KafkaClusterTestKit 
cluster, int expected) throws
         });
     }
 
+    @Test
+    public void testCreateClusterAndCreateListDeleteTopic() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(3)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+            TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+                "RaftManager was not initialized.");
+
+            String testTopic = "test-topic";
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                // Create a test topic
+                List<NewTopic> newTopic = List.of(new NewTopic(testTopic, 1, 
(short) 3));
+                CreateTopicsResult createTopicResult = 
admin.createTopics(newTopic);
+                createTopicResult.all().get();
+                waitForTopicListing(admin, List.of(testTopic), List.of());
+
+                // Delete topic
+                DeleteTopicsResult deleteResult = 
admin.deleteTopics(List.of(testTopic));
+                deleteResult.all().get();
+
+                // List again
+                waitForTopicListing(admin, List.of(), List.of(testTopic));
+            }
+        }
+    }
+
+    @Test
+    public void testCreateClusterAndCreateAndManyTopics() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(3)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+            TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+                "RaftManager was not initialized.");
+
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                // Create many topics
+                List<NewTopic> newTopics = List.of(
+                    new NewTopic("test-topic-1", 2, (short) 3),
+                    new NewTopic("test-topic-2", 2, (short) 3),
+                    new NewTopic("test-topic-3", 2, (short) 3)
+                );
+                CreateTopicsResult createTopicResult = 
admin.createTopics(newTopics);
+                createTopicResult.all().get();
+
+                // List created topics
+                waitForTopicListing(admin, List.of("test-topic-1", 
"test-topic-2", "test-topic-3"), List.of());
+            }
+        }
+    }
+
+    private Map<ClientQuotaEntity, Map<String, Double>> alterThenDescribe(
+        Admin admin,
+        ClientQuotaEntity entity,
+        List<ClientQuotaAlteration.Op> quotas,
+        ClientQuotaFilter filter,
+        int expectCount
+    ) throws Exception {
+        AlterClientQuotasResult alterResult = 
admin.alterClientQuotas(List.of(new ClientQuotaAlteration(entity, quotas)));
+        alterResult.all().get();
+
+        TestUtils.waitForCondition(() -> {
+            Map<ClientQuotaEntity, Map<String, Double>> results = 
admin.describeClientQuotas(filter).entities().get();
+            return results.getOrDefault(entity, Map.of()).size() == 
expectCount;
+        }, "Broker never saw new client quotas");
+
+        return admin.describeClientQuotas(filter).entities().get();
+    }
+
+    @Test
+    public void testClientQuotas() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                ClientQuotaEntity entity = new 
ClientQuotaEntity(Map.of("user", "testkit"));
+                ClientQuotaFilter filter = ClientQuotaFilter.containsOnly(
+                    List.of(ClientQuotaFilterComponent.ofEntity("user", 
"testkit")));
+
+                Map<ClientQuotaEntity, Map<String, Double>> describeResult = 
alterThenDescribe(admin, entity,
+                    List.of(new ClientQuotaAlteration.Op("request_percentage", 
0.99)), filter, 1);
+                assertEquals(0.99, 
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+                describeResult = alterThenDescribe(admin, entity, List.of(
+                    new ClientQuotaAlteration.Op("request_percentage", 0.97),
+                    new ClientQuotaAlteration.Op("producer_byte_rate", 
10000.0),
+                    new ClientQuotaAlteration.Op("consumer_byte_rate", 10001.0)
+                ), filter, 3);
+                assertEquals(0.97, 
describeResult.get(entity).get("request_percentage"), 1e-6);
+                assertEquals(10000.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+                assertEquals(10001.0, 
describeResult.get(entity).get("consumer_byte_rate"), 1e-6);
+
+                describeResult = alterThenDescribe(admin, entity, List.of(
+                    new ClientQuotaAlteration.Op("request_percentage", 0.95),
+                    new ClientQuotaAlteration.Op("producer_byte_rate", null),
+                    new ClientQuotaAlteration.Op("consumer_byte_rate", null)
+                ), filter, 1);
+                assertEquals(0.95, 
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+                alterThenDescribe(admin, entity, List.of(
+                    new ClientQuotaAlteration.Op("request_percentage", null)), 
filter, 0);
+
+                describeResult = alterThenDescribe(admin, entity,
+                    List.of(new ClientQuotaAlteration.Op("producer_byte_rate", 
9999.0)), filter, 1);
+                assertEquals(9999.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+
+                ClientQuotaEntity entity2 = new 
ClientQuotaEntity(Map.of("user", "testkit", "client-id", "some-client"));
+                filter = ClientQuotaFilter.containsOnly(
+                    List.of(
+                        ClientQuotaFilterComponent.ofEntity("user", "testkit"),
+                        ClientQuotaFilterComponent.ofEntity("client-id", 
"some-client")
+                    ));
+                describeResult = alterThenDescribe(admin, entity2,
+                    List.of(new ClientQuotaAlteration.Op("producer_byte_rate", 
9998.0)), filter, 1);
+                assertEquals(9998.0, 
describeResult.get(entity2).get("producer_byte_rate"), 1e-6);
+
+                final ClientQuotaFilter finalFilter = 
ClientQuotaFilter.contains(
+                    List.of(ClientQuotaFilterComponent.ofEntity("user", 
"testkit")));
+
+                TestUtils.waitForCondition(() -> {
+                    Map<ClientQuotaEntity, Map<String, Double>> results = 
admin.describeClientQuotas(finalFilter).entities().get();
+                    if (results.size() != 2) {
+                        return false;
+                    }
+                    assertEquals(9999.0, 
results.get(entity).get("producer_byte_rate"), 1e-6);
+                    assertEquals(9998.0, 
results.get(entity2).get("producer_byte_rate"), 1e-6);
+                    return true;
+                }, "Broker did not see two client quotas");
+            }
+        }
+    }
+
+    private void setConsumerByteRate(Admin admin, ClientQuotaEntity entity, 
Long value) throws Exception {
+        admin.alterClientQuotas(List.of(
+            new ClientQuotaAlteration(entity, List.of(
+                new ClientQuotaAlteration.Op("consumer_byte_rate", 
value.doubleValue())))
+        )).all().get();
+    }
+
+    private Map<ClientQuotaEntity, Long> getConsumerByteRates(Admin admin) 
throws Exception {
+        return 
admin.describeClientQuotas(ClientQuotaFilter.contains(List.of()))
+            .entities().get()
+            .entrySet().stream()
+            .filter(entry -> 
entry.getValue().containsKey("consumer_byte_rate"))
+            .collect(Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> entry.getValue().get("consumer_byte_rate").longValue()
+            ));
+    }
+
+    @Test
+    public void testDefaultClientQuotas() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                ClientQuotaEntity defaultUser = new 
ClientQuotaEntity(Collections.singletonMap("user", null));
+                ClientQuotaEntity bobUser = new 
ClientQuotaEntity(Map.of("user", "bob"));
+
+                TestUtils.waitForCondition(
+                    () -> getConsumerByteRates(admin).isEmpty(),
+                    "Initial consumer byte rates should be empty");
+
+                setConsumerByteRate(admin, defaultUser, 100L);
+                TestUtils.waitForCondition(() -> {
+                    Map<ClientQuotaEntity, Long> rates = 
getConsumerByteRates(admin);
+                    return rates.size() == 1 &&
+                        rates.get(defaultUser) == 100L;
+                }, "Default user rate should be 100");
+
+                setConsumerByteRate(admin, bobUser, 1000L);
+                TestUtils.waitForCondition(() -> {
+                    Map<ClientQuotaEntity, Long> rates = 
getConsumerByteRates(admin);
+                    return rates.size() == 2 &&
+                        rates.get(defaultUser) == 100L &&
+                        rates.get(bobUser) == 1000L;
+                }, "Should have both default and bob user rates");
+            }
+        }
+    }
+
+    @Test
+    public void testCreateClusterWithAdvertisedPortZero() throws Exception {
+        Map<Integer, Map<String, String>> brokerPropertyOverrides = new 
HashMap<>();
+        for (int brokerId = 0; brokerId < 3; brokerId++) {
+            Map<String, String> props = new HashMap<>();
+            props.put(SocketServerConfigs.LISTENERS_CONFIG, 
"EXTERNAL://localhost:0");
+            props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"EXTERNAL://localhost:0");
+            brokerPropertyOverrides.put(brokerId, props);
+        }
+
+        TestKitNodes nodes = new TestKitNodes.Builder()
+            .setNumControllerNodes(1)
+            .setNumBrokerNodes(3)
+            .setPerServerProperties(brokerPropertyOverrides)
+            .build();
+
+        doOnStartedKafkaCluster(nodes, cluster ->
+            
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
 Duration.ofSeconds(15), cluster)
+                .nodes().values().forEach(broker -> {
+                    assertEquals("localhost", broker.host(),
+                        "Did not advertise configured advertised host");
+                    
assertEquals(cluster.brokers().get(broker.id()).socketServer().boundPort(cluster.nodes().brokerListenerName()),
 broker.port(),
+                        "Did not advertise bound socket port");
+                }));
+    }
+
+    @Test
+    public void 
testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer() throws 
Exception {
+        Map<Integer, Map<String, String>> brokerPropertyOverrides = new 
HashMap<>();

Review Comment:
   ```java
           var brokerPropertyOverrides = IntStream.range(0, 
3).boxed().collect(Collectors.toMap(brokerId -> brokerId, brokerId -> Map.of(
               SocketServerConfigs.LISTENERS_CONFIG, "EXTERNAL://localhost:0",
               SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"EXTERNAL://advertised-host-" + brokerId + ":" + (brokerId + 100)
           )));
   ```



##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -262,6 +285,375 @@ private void assertFoobarValue(KafkaClusterTestKit 
cluster, int expected) throws
         });
     }
 
+    @Test
+    public void testCreateClusterAndCreateListDeleteTopic() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(3)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+            TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+                "RaftManager was not initialized.");
+
+            String testTopic = "test-topic";
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                // Create a test topic
+                List<NewTopic> newTopic = List.of(new NewTopic(testTopic, 1, 
(short) 3));
+                CreateTopicsResult createTopicResult = 
admin.createTopics(newTopic);
+                createTopicResult.all().get();
+                waitForTopicListing(admin, List.of(testTopic), List.of());
+
+                // Delete topic
+                DeleteTopicsResult deleteResult = 
admin.deleteTopics(List.of(testTopic));
+                deleteResult.all().get();
+
+                // List again
+                waitForTopicListing(admin, List.of(), List.of(testTopic));
+            }
+        }
+    }
+
+    @Test
+    public void testCreateClusterAndCreateAndManyTopics() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(3)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+            TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+                "RaftManager was not initialized.");
+
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                // Create many topics
+                List<NewTopic> newTopics = List.of(
+                    new NewTopic("test-topic-1", 2, (short) 3),
+                    new NewTopic("test-topic-2", 2, (short) 3),
+                    new NewTopic("test-topic-3", 2, (short) 3)
+                );
+                CreateTopicsResult createTopicResult = 
admin.createTopics(newTopics);
+                createTopicResult.all().get();
+
+                // List created topics
+                waitForTopicListing(admin, List.of("test-topic-1", 
"test-topic-2", "test-topic-3"), List.of());
+            }
+        }
+    }
+
+    private Map<ClientQuotaEntity, Map<String, Double>> alterThenDescribe(
+        Admin admin,
+        ClientQuotaEntity entity,
+        List<ClientQuotaAlteration.Op> quotas,
+        ClientQuotaFilter filter,
+        int expectCount
+    ) throws Exception {
+        AlterClientQuotasResult alterResult = 
admin.alterClientQuotas(List.of(new ClientQuotaAlteration(entity, quotas)));
+        alterResult.all().get();
+
+        TestUtils.waitForCondition(() -> {
+            Map<ClientQuotaEntity, Map<String, Double>> results = 
admin.describeClientQuotas(filter).entities().get();
+            return results.getOrDefault(entity, Map.of()).size() == 
expectCount;
+        }, "Broker never saw new client quotas");
+
+        return admin.describeClientQuotas(filter).entities().get();
+    }
+
+    @Test
+    public void testClientQuotas() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                ClientQuotaEntity entity = new 
ClientQuotaEntity(Map.of("user", "testkit"));
+                ClientQuotaFilter filter = ClientQuotaFilter.containsOnly(
+                    List.of(ClientQuotaFilterComponent.ofEntity("user", 
"testkit")));
+
+                Map<ClientQuotaEntity, Map<String, Double>> describeResult = 
alterThenDescribe(admin, entity,
+                    List.of(new ClientQuotaAlteration.Op("request_percentage", 
0.99)), filter, 1);
+                assertEquals(0.99, 
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+                describeResult = alterThenDescribe(admin, entity, List.of(
+                    new ClientQuotaAlteration.Op("request_percentage", 0.97),
+                    new ClientQuotaAlteration.Op("producer_byte_rate", 
10000.0),
+                    new ClientQuotaAlteration.Op("consumer_byte_rate", 10001.0)
+                ), filter, 3);
+                assertEquals(0.97, 
describeResult.get(entity).get("request_percentage"), 1e-6);
+                assertEquals(10000.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+                assertEquals(10001.0, 
describeResult.get(entity).get("consumer_byte_rate"), 1e-6);
+
+                describeResult = alterThenDescribe(admin, entity, List.of(
+                    new ClientQuotaAlteration.Op("request_percentage", 0.95),
+                    new ClientQuotaAlteration.Op("producer_byte_rate", null),
+                    new ClientQuotaAlteration.Op("consumer_byte_rate", null)
+                ), filter, 1);
+                assertEquals(0.95, 
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+                alterThenDescribe(admin, entity, List.of(
+                    new ClientQuotaAlteration.Op("request_percentage", null)), 
filter, 0);
+
+                describeResult = alterThenDescribe(admin, entity,
+                    List.of(new ClientQuotaAlteration.Op("producer_byte_rate", 
9999.0)), filter, 1);
+                assertEquals(9999.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+
+                ClientQuotaEntity entity2 = new 
ClientQuotaEntity(Map.of("user", "testkit", "client-id", "some-client"));
+                filter = ClientQuotaFilter.containsOnly(
+                    List.of(
+                        ClientQuotaFilterComponent.ofEntity("user", "testkit"),
+                        ClientQuotaFilterComponent.ofEntity("client-id", 
"some-client")
+                    ));
+                describeResult = alterThenDescribe(admin, entity2,
+                    List.of(new ClientQuotaAlteration.Op("producer_byte_rate", 
9998.0)), filter, 1);
+                assertEquals(9998.0, 
describeResult.get(entity2).get("producer_byte_rate"), 1e-6);
+
+                final ClientQuotaFilter finalFilter = 
ClientQuotaFilter.contains(
+                    List.of(ClientQuotaFilterComponent.ofEntity("user", 
"testkit")));
+
+                TestUtils.waitForCondition(() -> {
+                    Map<ClientQuotaEntity, Map<String, Double>> results = 
admin.describeClientQuotas(finalFilter).entities().get();
+                    if (results.size() != 2) {
+                        return false;
+                    }
+                    assertEquals(9999.0, 
results.get(entity).get("producer_byte_rate"), 1e-6);
+                    assertEquals(9998.0, 
results.get(entity2).get("producer_byte_rate"), 1e-6);
+                    return true;
+                }, "Broker did not see two client quotas");
+            }
+        }
+    }
+
+    private void setConsumerByteRate(Admin admin, ClientQuotaEntity entity, 
Long value) throws Exception {
+        admin.alterClientQuotas(List.of(
+            new ClientQuotaAlteration(entity, List.of(
+                new ClientQuotaAlteration.Op("consumer_byte_rate", 
value.doubleValue())))
+        )).all().get();
+    }
+
+    private Map<ClientQuotaEntity, Long> getConsumerByteRates(Admin admin) 
throws Exception {
+        return 
admin.describeClientQuotas(ClientQuotaFilter.contains(List.of()))
+            .entities().get()
+            .entrySet().stream()
+            .filter(entry -> 
entry.getValue().containsKey("consumer_byte_rate"))
+            .collect(Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> entry.getValue().get("consumer_byte_rate").longValue()
+            ));
+    }
+
+    @Test
+    public void testDefaultClientQuotas() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                ClientQuotaEntity defaultUser = new 
ClientQuotaEntity(Collections.singletonMap("user", null));
+                ClientQuotaEntity bobUser = new 
ClientQuotaEntity(Map.of("user", "bob"));
+
+                TestUtils.waitForCondition(
+                    () -> getConsumerByteRates(admin).isEmpty(),
+                    "Initial consumer byte rates should be empty");
+
+                setConsumerByteRate(admin, defaultUser, 100L);
+                TestUtils.waitForCondition(() -> {
+                    Map<ClientQuotaEntity, Long> rates = 
getConsumerByteRates(admin);
+                    return rates.size() == 1 &&
+                        rates.get(defaultUser) == 100L;
+                }, "Default user rate should be 100");
+
+                setConsumerByteRate(admin, bobUser, 1000L);
+                TestUtils.waitForCondition(() -> {
+                    Map<ClientQuotaEntity, Long> rates = 
getConsumerByteRates(admin);
+                    return rates.size() == 2 &&
+                        rates.get(defaultUser) == 100L &&
+                        rates.get(bobUser) == 1000L;
+                }, "Should have both default and bob user rates");
+            }
+        }
+    }
+
+    @Test
+    public void testCreateClusterWithAdvertisedPortZero() throws Exception {
+        Map<Integer, Map<String, String>> brokerPropertyOverrides = new 
HashMap<>();
+        for (int brokerId = 0; brokerId < 3; brokerId++) {
+            Map<String, String> props = new HashMap<>();
+            props.put(SocketServerConfigs.LISTENERS_CONFIG, 
"EXTERNAL://localhost:0");
+            props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"EXTERNAL://localhost:0");
+            brokerPropertyOverrides.put(brokerId, props);
+        }
+
+        TestKitNodes nodes = new TestKitNodes.Builder()
+            .setNumControllerNodes(1)
+            .setNumBrokerNodes(3)
+            .setPerServerProperties(brokerPropertyOverrides)
+            .build();
+
+        doOnStartedKafkaCluster(nodes, cluster ->
+            
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
 Duration.ofSeconds(15), cluster)
+                .nodes().values().forEach(broker -> {
+                    assertEquals("localhost", broker.host(),
+                        "Did not advertise configured advertised host");
+                    
assertEquals(cluster.brokers().get(broker.id()).socketServer().boundPort(cluster.nodes().brokerListenerName()),
 broker.port(),
+                        "Did not advertise bound socket port");
+                }));
+    }
+
+    @Test
+    public void 
testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer() throws 
Exception {
+        Map<Integer, Map<String, String>> brokerPropertyOverrides = new 
HashMap<>();
+        for (int brokerId = 0; brokerId < 3; brokerId++) {
+            Map<String, String> props = new HashMap<>();
+            props.put(SocketServerConfigs.LISTENERS_CONFIG, 
"EXTERNAL://localhost:0");
+            props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
+                "EXTERNAL://advertised-host-" + brokerId + ":" + (brokerId + 
100));
+            brokerPropertyOverrides.put(brokerId, props);
+        }
+
+        TestKitNodes nodes = new TestKitNodes.Builder()
+            .setNumControllerNodes(1)
+            .setNumBrokerNodes(3)
+            .setNumDisksPerBroker(1)
+            .setPerServerProperties(brokerPropertyOverrides)
+            .build();
+
+        doOnStartedKafkaCluster(nodes, cluster ->
+            
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
 Duration.ofSeconds(15), cluster)
+                .nodes().values().forEach(broker -> {
+                    assertEquals("advertised-host-" + broker.id(), 
broker.host(), "Did not advertise configured advertised host");
+                    assertEquals(broker.id() + 100, broker.port(), "Did not 
advertise configured advertised port");
+                }));
+    }
+
+    private void doOnStartedKafkaCluster(TestKitNodes nodes, 
ThrowingConsumer<KafkaClusterTestKit> action) throws Exception {
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).build()) {
+            cluster.format();
+            cluster.startup();
+            action.accept(cluster);
+        }
+    }
+
+    private DescribeClusterResponse 
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(
+        ListenerName listenerName,
+        Duration waitTime,
+        KafkaClusterTestKit cluster
+    ) throws Exception {
+        long startTime = System.currentTimeMillis();
+        TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+            "Broker never made it to RUNNING state.");
+        TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+            "RaftManager was not initialized.");
+
+        Duration remainingWaitTime = 
waitTime.minus(Duration.ofMillis(System.currentTimeMillis() - startTime));
+
+        final DescribeClusterResponse[] currentResponse = new 
DescribeClusterResponse[1];
+        int expectedBrokerCount = cluster.nodes().brokerNodes().size();
+        TestUtils.waitForCondition(
+            () -> {
+                currentResponse[0] = connectAndReceive(
+                    new DescribeClusterRequest.Builder(new 
DescribeClusterRequestData()).build(),
+                    
cluster.brokers().get(0).socketServer().boundPort(listenerName)
+                );
+                return currentResponse[0].nodes().size() == 
expectedBrokerCount;
+            },
+            remainingWaitTime.toMillis(),
+            String.format("After %s ms Broker is only aware of %s brokers, but 
%s are expected", remainingWaitTime.toMillis(), expectedBrokerCount, 
expectedBrokerCount)
+        );
+
+        return currentResponse[0];
+    }
+
+    @FunctionalInterface
+    private interface ThrowingConsumer<T> {
+        void accept(T t) throws Exception;
+    }
+
+    private void waitForTopicListing(Admin admin, List<String> 
expectedPresent, List<String> expectedAbsent)
+        throws InterruptedException {
+        Set<String> topicsNotFound = new HashSet<>(expectedPresent);
+        Set<String> extraTopics = new HashSet<>();
+        TestUtils.waitForCondition(() -> {
+            Set<String> topicNames = admin.listTopics().names().get();
+            topicsNotFound.removeAll(topicNames);
+            extraTopics.clear();
+            
extraTopics.addAll(topicNames.stream().filter(expectedAbsent::contains).collect(Collectors.toSet()));
+            return topicsNotFound.isEmpty() && extraTopics.isEmpty();
+        }, String.format("Failed to find topic(s): %s and NOT find topic(s): 
%s", topicsNotFound, extraTopics));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testUnregisterBroker(boolean usingBootstrapController) throws 
Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(4)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+            TestUtils.waitForCondition(() -> 
brokerIsUnfenced(clusterImage(cluster, 1), 0),
+                "Timed out waiting for broker 0 to be unfenced.");
+            cluster.brokers().get(0).shutdown();
+            TestUtils.waitForCondition(() -> 
!brokerIsUnfenced(clusterImage(cluster, 1), 0),
+                "Timed out waiting for broker 0 to be fenced.");
+
+            try (Admin admin = createAdminClient(cluster, 
usingBootstrapController)) {
+                admin.unregisterBroker(0);
+            }
+
+            TestUtils.waitForCondition(() -> 
brokerIsAbsent(clusterImage(cluster, 1), 0),
+                "Timed out waiting for broker 0 to be fenced.");
+        }
+    }
+
+    private ClusterImage clusterImage(KafkaClusterTestKit cluster, int 
brokerId) {
+        return 
cluster.brokers().get(brokerId).metadataCache().currentImage().cluster();
+    }
+
+    private boolean brokerIsUnfenced(ClusterImage image, int brokerId) {
+        BrokerRegistration registration = image.brokers().get(brokerId);
+        if (registration == null) {
+            return false;
+        }
+        return !registration.fenced();
+    }
+
+    private boolean brokerIsAbsent(ClusterImage image, int brokerId) {
+        return !image.brokers().containsKey(brokerId);
+    }
+
+    private Admin createAdminClient(KafkaClusterTestKit cluster, boolean 
bootstrapController) {
+        Properties props;
+        if (bootstrapController) {
+            props = 
cluster.newClientPropertiesBuilder().setUsingBootstrapControllers(true).build();

Review Comment:
   this call chain is somewhat redundant. I suggest we make it consistent with 
`ClusterInstance#admin`



##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -262,6 +285,375 @@ private void assertFoobarValue(KafkaClusterTestKit 
cluster, int expected) throws
         });
     }
 
+    @Test
+    public void testCreateClusterAndCreateListDeleteTopic() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(3)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+            TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+                "RaftManager was not initialized.");
+
+            String testTopic = "test-topic";
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                // Create a test topic
+                List<NewTopic> newTopic = List.of(new NewTopic(testTopic, 1, 
(short) 3));
+                CreateTopicsResult createTopicResult = 
admin.createTopics(newTopic);
+                createTopicResult.all().get();
+                waitForTopicListing(admin, List.of(testTopic), List.of());
+
+                // Delete topic
+                DeleteTopicsResult deleteResult = 
admin.deleteTopics(List.of(testTopic));
+                deleteResult.all().get();
+
+                // List again
+                waitForTopicListing(admin, List.of(), List.of(testTopic));
+            }
+        }
+    }
+
+    @Test
+    public void testCreateClusterAndCreateAndManyTopics() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(3)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+            TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+                "RaftManager was not initialized.");
+
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                // Create many topics
+                List<NewTopic> newTopics = List.of(
+                    new NewTopic("test-topic-1", 2, (short) 3),
+                    new NewTopic("test-topic-2", 2, (short) 3),
+                    new NewTopic("test-topic-3", 2, (short) 3)
+                );
+                CreateTopicsResult createTopicResult = 
admin.createTopics(newTopics);
+                createTopicResult.all().get();
+
+                // List created topics
+                waitForTopicListing(admin, List.of("test-topic-1", 
"test-topic-2", "test-topic-3"), List.of());
+            }
+        }
+    }
+
+    private Map<ClientQuotaEntity, Map<String, Double>> alterThenDescribe(
+        Admin admin,
+        ClientQuotaEntity entity,
+        List<ClientQuotaAlteration.Op> quotas,
+        ClientQuotaFilter filter,
+        int expectCount
+    ) throws Exception {
+        AlterClientQuotasResult alterResult = 
admin.alterClientQuotas(List.of(new ClientQuotaAlteration(entity, quotas)));
+        alterResult.all().get();
+
+        TestUtils.waitForCondition(() -> {
+            Map<ClientQuotaEntity, Map<String, Double>> results = 
admin.describeClientQuotas(filter).entities().get();
+            return results.getOrDefault(entity, Map.of()).size() == 
expectCount;
+        }, "Broker never saw new client quotas");
+
+        return admin.describeClientQuotas(filter).entities().get();
+    }
+
+    @Test
+    public void testClientQuotas() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                ClientQuotaEntity entity = new 
ClientQuotaEntity(Map.of("user", "testkit"));
+                ClientQuotaFilter filter = ClientQuotaFilter.containsOnly(
+                    List.of(ClientQuotaFilterComponent.ofEntity("user", 
"testkit")));
+
+                Map<ClientQuotaEntity, Map<String, Double>> describeResult = 
alterThenDescribe(admin, entity,
+                    List.of(new ClientQuotaAlteration.Op("request_percentage", 
0.99)), filter, 1);
+                assertEquals(0.99, 
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+                describeResult = alterThenDescribe(admin, entity, List.of(
+                    new ClientQuotaAlteration.Op("request_percentage", 0.97),
+                    new ClientQuotaAlteration.Op("producer_byte_rate", 
10000.0),
+                    new ClientQuotaAlteration.Op("consumer_byte_rate", 10001.0)
+                ), filter, 3);
+                assertEquals(0.97, 
describeResult.get(entity).get("request_percentage"), 1e-6);
+                assertEquals(10000.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+                assertEquals(10001.0, 
describeResult.get(entity).get("consumer_byte_rate"), 1e-6);
+
+                describeResult = alterThenDescribe(admin, entity, List.of(
+                    new ClientQuotaAlteration.Op("request_percentage", 0.95),
+                    new ClientQuotaAlteration.Op("producer_byte_rate", null),
+                    new ClientQuotaAlteration.Op("consumer_byte_rate", null)
+                ), filter, 1);
+                assertEquals(0.95, 
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+                alterThenDescribe(admin, entity, List.of(
+                    new ClientQuotaAlteration.Op("request_percentage", null)), 
filter, 0);
+
+                describeResult = alterThenDescribe(admin, entity,
+                    List.of(new ClientQuotaAlteration.Op("producer_byte_rate", 
9999.0)), filter, 1);
+                assertEquals(9999.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+
+                ClientQuotaEntity entity2 = new 
ClientQuotaEntity(Map.of("user", "testkit", "client-id", "some-client"));
+                filter = ClientQuotaFilter.containsOnly(
+                    List.of(
+                        ClientQuotaFilterComponent.ofEntity("user", "testkit"),
+                        ClientQuotaFilterComponent.ofEntity("client-id", 
"some-client")
+                    ));
+                describeResult = alterThenDescribe(admin, entity2,
+                    List.of(new ClientQuotaAlteration.Op("producer_byte_rate", 
9998.0)), filter, 1);
+                assertEquals(9998.0, 
describeResult.get(entity2).get("producer_byte_rate"), 1e-6);
+
+                final ClientQuotaFilter finalFilter = 
ClientQuotaFilter.contains(
+                    List.of(ClientQuotaFilterComponent.ofEntity("user", 
"testkit")));
+
+                TestUtils.waitForCondition(() -> {
+                    Map<ClientQuotaEntity, Map<String, Double>> results = 
admin.describeClientQuotas(finalFilter).entities().get();
+                    if (results.size() != 2) {
+                        return false;
+                    }
+                    assertEquals(9999.0, 
results.get(entity).get("producer_byte_rate"), 1e-6);
+                    assertEquals(9998.0, 
results.get(entity2).get("producer_byte_rate"), 1e-6);
+                    return true;
+                }, "Broker did not see two client quotas");
+            }
+        }
+    }
+
+    private void setConsumerByteRate(Admin admin, ClientQuotaEntity entity, 
Long value) throws Exception {
+        admin.alterClientQuotas(List.of(
+            new ClientQuotaAlteration(entity, List.of(
+                new ClientQuotaAlteration.Op("consumer_byte_rate", 
value.doubleValue())))
+        )).all().get();
+    }
+
+    private Map<ClientQuotaEntity, Long> getConsumerByteRates(Admin admin) 
throws Exception {
+        return 
admin.describeClientQuotas(ClientQuotaFilter.contains(List.of()))
+            .entities().get()
+            .entrySet().stream()
+            .filter(entry -> 
entry.getValue().containsKey("consumer_byte_rate"))
+            .collect(Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> entry.getValue().get("consumer_byte_rate").longValue()
+            ));
+    }
+
+    @Test
+    public void testDefaultClientQuotas() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                ClientQuotaEntity defaultUser = new 
ClientQuotaEntity(Collections.singletonMap("user", null));
+                ClientQuotaEntity bobUser = new 
ClientQuotaEntity(Map.of("user", "bob"));
+
+                TestUtils.waitForCondition(
+                    () -> getConsumerByteRates(admin).isEmpty(),
+                    "Initial consumer byte rates should be empty");
+
+                setConsumerByteRate(admin, defaultUser, 100L);
+                TestUtils.waitForCondition(() -> {
+                    Map<ClientQuotaEntity, Long> rates = 
getConsumerByteRates(admin);
+                    return rates.size() == 1 &&
+                        rates.get(defaultUser) == 100L;
+                }, "Default user rate should be 100");
+
+                setConsumerByteRate(admin, bobUser, 1000L);
+                TestUtils.waitForCondition(() -> {
+                    Map<ClientQuotaEntity, Long> rates = 
getConsumerByteRates(admin);
+                    return rates.size() == 2 &&
+                        rates.get(defaultUser) == 100L &&
+                        rates.get(bobUser) == 1000L;
+                }, "Should have both default and bob user rates");
+            }
+        }
+    }
+
+    @Test
+    public void testCreateClusterWithAdvertisedPortZero() throws Exception {
+        Map<Integer, Map<String, String>> brokerPropertyOverrides = new 
HashMap<>();
+        for (int brokerId = 0; brokerId < 3; brokerId++) {
+            Map<String, String> props = new HashMap<>();
+            props.put(SocketServerConfigs.LISTENERS_CONFIG, 
"EXTERNAL://localhost:0");
+            props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"EXTERNAL://localhost:0");
+            brokerPropertyOverrides.put(brokerId, props);
+        }
+
+        TestKitNodes nodes = new TestKitNodes.Builder()
+            .setNumControllerNodes(1)
+            .setNumBrokerNodes(3)
+            .setPerServerProperties(brokerPropertyOverrides)
+            .build();
+
+        doOnStartedKafkaCluster(nodes, cluster ->
+            
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
 Duration.ofSeconds(15), cluster)
+                .nodes().values().forEach(broker -> {
+                    assertEquals("localhost", broker.host(),
+                        "Did not advertise configured advertised host");
+                    
assertEquals(cluster.brokers().get(broker.id()).socketServer().boundPort(cluster.nodes().brokerListenerName()),
 broker.port(),
+                        "Did not advertise bound socket port");
+                }));
+    }
+
+    @Test
+    public void 
testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer() throws 
Exception {
+        Map<Integer, Map<String, String>> brokerPropertyOverrides = new 
HashMap<>();
+        for (int brokerId = 0; brokerId < 3; brokerId++) {
+            Map<String, String> props = new HashMap<>();
+            props.put(SocketServerConfigs.LISTENERS_CONFIG, 
"EXTERNAL://localhost:0");
+            props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
+                "EXTERNAL://advertised-host-" + brokerId + ":" + (brokerId + 
100));
+            brokerPropertyOverrides.put(brokerId, props);
+        }
+
+        TestKitNodes nodes = new TestKitNodes.Builder()
+            .setNumControllerNodes(1)
+            .setNumBrokerNodes(3)
+            .setNumDisksPerBroker(1)
+            .setPerServerProperties(brokerPropertyOverrides)
+            .build();
+
+        doOnStartedKafkaCluster(nodes, cluster ->
+            
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
 Duration.ofSeconds(15), cluster)
+                .nodes().values().forEach(broker -> {
+                    assertEquals("advertised-host-" + broker.id(), 
broker.host(), "Did not advertise configured advertised host");
+                    assertEquals(broker.id() + 100, broker.port(), "Did not 
advertise configured advertised port");
+                }));
+    }
+
+    private void doOnStartedKafkaCluster(TestKitNodes nodes, 
ThrowingConsumer<KafkaClusterTestKit> action) throws Exception {

Review Comment:
   I suggest using `java.util.function.Consumer`. This would require updating 
the other lambda functions to ensure they only throw `RuntimeException`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to