chia7712 commented on code in PR #20861:
URL: https://github.com/apache/kafka/pull/20861#discussion_r2609538241
##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -262,6 +285,375 @@ private void assertFoobarValue(KafkaClusterTestKit
cluster, int expected) throws
});
}
+ @Test
+ public void testCreateClusterAndCreateListDeleteTopic() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+ TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+ "RaftManager was not initialized.");
+
+ String testTopic = "test-topic";
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ // Create a test topic
+ List<NewTopic> newTopic = List.of(new NewTopic(testTopic, 1,
(short) 3));
+ CreateTopicsResult createTopicResult =
admin.createTopics(newTopic);
+ createTopicResult.all().get();
+ waitForTopicListing(admin, List.of(testTopic), List.of());
+
+ // Delete topic
+ DeleteTopicsResult deleteResult =
admin.deleteTopics(List.of(testTopic));
+ deleteResult.all().get();
+
+ // List again
+ waitForTopicListing(admin, List.of(), List.of(testTopic));
+ }
+ }
+ }
+
+ @Test
+ public void testCreateClusterAndCreateAndManyTopics() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+ TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+ "RaftManager was not initialized.");
+
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ // Create many topics
+ List<NewTopic> newTopics = List.of(
+ new NewTopic("test-topic-1", 2, (short) 3),
+ new NewTopic("test-topic-2", 2, (short) 3),
+ new NewTopic("test-topic-3", 2, (short) 3)
+ );
+ CreateTopicsResult createTopicResult =
admin.createTopics(newTopics);
+ createTopicResult.all().get();
+
+ // List created topics
+ waitForTopicListing(admin, List.of("test-topic-1",
"test-topic-2", "test-topic-3"), List.of());
+ }
+ }
+ }
+
+ private Map<ClientQuotaEntity, Map<String, Double>> alterThenDescribe(
+ Admin admin,
+ ClientQuotaEntity entity,
+ List<ClientQuotaAlteration.Op> quotas,
+ ClientQuotaFilter filter,
+ int expectCount
+ ) throws Exception {
+ AlterClientQuotasResult alterResult =
admin.alterClientQuotas(List.of(new ClientQuotaAlteration(entity, quotas)));
+ alterResult.all().get();
+
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Map<String, Double>> results =
admin.describeClientQuotas(filter).entities().get();
+ return results.getOrDefault(entity, Map.of()).size() ==
expectCount;
+ }, "Broker never saw new client quotas");
+
+ return admin.describeClientQuotas(filter).entities().get();
+ }
+
+ @Test
+ public void testClientQuotas() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ ClientQuotaEntity entity = new
ClientQuotaEntity(Map.of("user", "testkit"));
+ ClientQuotaFilter filter = ClientQuotaFilter.containsOnly(
+ List.of(ClientQuotaFilterComponent.ofEntity("user",
"testkit")));
+
+ Map<ClientQuotaEntity, Map<String, Double>> describeResult =
alterThenDescribe(admin, entity,
+ List.of(new ClientQuotaAlteration.Op("request_percentage",
0.99)), filter, 1);
+ assertEquals(0.99,
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+ describeResult = alterThenDescribe(admin, entity, List.of(
+ new ClientQuotaAlteration.Op("request_percentage", 0.97),
+ new ClientQuotaAlteration.Op("producer_byte_rate",
10000.0),
+ new ClientQuotaAlteration.Op("consumer_byte_rate", 10001.0)
+ ), filter, 3);
+ assertEquals(0.97,
describeResult.get(entity).get("request_percentage"), 1e-6);
+ assertEquals(10000.0,
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+ assertEquals(10001.0,
describeResult.get(entity).get("consumer_byte_rate"), 1e-6);
+
+ describeResult = alterThenDescribe(admin, entity, List.of(
+ new ClientQuotaAlteration.Op("request_percentage", 0.95),
+ new ClientQuotaAlteration.Op("producer_byte_rate", null),
+ new ClientQuotaAlteration.Op("consumer_byte_rate", null)
+ ), filter, 1);
+ assertEquals(0.95,
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+ alterThenDescribe(admin, entity, List.of(
+ new ClientQuotaAlteration.Op("request_percentage", null)),
filter, 0);
+
+ describeResult = alterThenDescribe(admin, entity,
+ List.of(new ClientQuotaAlteration.Op("producer_byte_rate",
9999.0)), filter, 1);
+ assertEquals(9999.0,
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+
+ ClientQuotaEntity entity2 = new
ClientQuotaEntity(Map.of("user", "testkit", "client-id", "some-client"));
+ filter = ClientQuotaFilter.containsOnly(
+ List.of(
+ ClientQuotaFilterComponent.ofEntity("user", "testkit"),
+ ClientQuotaFilterComponent.ofEntity("client-id",
"some-client")
+ ));
+ describeResult = alterThenDescribe(admin, entity2,
+ List.of(new ClientQuotaAlteration.Op("producer_byte_rate",
9998.0)), filter, 1);
+ assertEquals(9998.0,
describeResult.get(entity2).get("producer_byte_rate"), 1e-6);
+
+ final ClientQuotaFilter finalFilter =
ClientQuotaFilter.contains(
+ List.of(ClientQuotaFilterComponent.ofEntity("user",
"testkit")));
+
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Map<String, Double>> results =
admin.describeClientQuotas(finalFilter).entities().get();
+ if (results.size() != 2) {
+ return false;
+ }
+ assertEquals(9999.0,
results.get(entity).get("producer_byte_rate"), 1e-6);
+ assertEquals(9998.0,
results.get(entity2).get("producer_byte_rate"), 1e-6);
+ return true;
+ }, "Broker did not see two client quotas");
+ }
+ }
+ }
+
+ private void setConsumerByteRate(Admin admin, ClientQuotaEntity entity,
Long value) throws Exception {
+ admin.alterClientQuotas(List.of(
+ new ClientQuotaAlteration(entity, List.of(
+ new ClientQuotaAlteration.Op("consumer_byte_rate",
value.doubleValue())))
+ )).all().get();
+ }
+
+ private Map<ClientQuotaEntity, Long> getConsumerByteRates(Admin admin)
throws Exception {
+ return
admin.describeClientQuotas(ClientQuotaFilter.contains(List.of()))
+ .entities().get()
+ .entrySet().stream()
+ .filter(entry ->
entry.getValue().containsKey("consumer_byte_rate"))
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> entry.getValue().get("consumer_byte_rate").longValue()
+ ));
+ }
+
+ @Test
+ public void testDefaultClientQuotas() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ ClientQuotaEntity defaultUser = new
ClientQuotaEntity(Collections.singletonMap("user", null));
+ ClientQuotaEntity bobUser = new
ClientQuotaEntity(Map.of("user", "bob"));
+
+ TestUtils.waitForCondition(
+ () -> getConsumerByteRates(admin).isEmpty(),
+ "Initial consumer byte rates should be empty");
+
+ setConsumerByteRate(admin, defaultUser, 100L);
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Long> rates =
getConsumerByteRates(admin);
+ return rates.size() == 1 &&
+ rates.get(defaultUser) == 100L;
+ }, "Default user rate should be 100");
+
+ setConsumerByteRate(admin, bobUser, 1000L);
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Long> rates =
getConsumerByteRates(admin);
+ return rates.size() == 2 &&
+ rates.get(defaultUser) == 100L &&
+ rates.get(bobUser) == 1000L;
+ }, "Should have both default and bob user rates");
+ }
+ }
+ }
+
+ @Test
+ public void testCreateClusterWithAdvertisedPortZero() throws Exception {
+ Map<Integer, Map<String, String>> brokerPropertyOverrides = new
HashMap<>();
+ for (int brokerId = 0; brokerId < 3; brokerId++) {
+ Map<String, String> props = new HashMap<>();
+ props.put(SocketServerConfigs.LISTENERS_CONFIG,
"EXTERNAL://localhost:0");
+ props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"EXTERNAL://localhost:0");
+ brokerPropertyOverrides.put(brokerId, props);
+ }
+
+ TestKitNodes nodes = new TestKitNodes.Builder()
+ .setNumControllerNodes(1)
+ .setNumBrokerNodes(3)
+ .setPerServerProperties(brokerPropertyOverrides)
+ .build();
+
+ doOnStartedKafkaCluster(nodes, cluster ->
+
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
Duration.ofSeconds(15), cluster)
+ .nodes().values().forEach(broker -> {
+ assertEquals("localhost", broker.host(),
+ "Did not advertise configured advertised host");
+
assertEquals(cluster.brokers().get(broker.id()).socketServer().boundPort(cluster.nodes().brokerListenerName()),
broker.port(),
+ "Did not advertise bound socket port");
+ }));
+ }
+
+ @Test
+ public void
testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer() throws
Exception {
+ Map<Integer, Map<String, String>> brokerPropertyOverrides = new
HashMap<>();
Review Comment:
```java
var brokerPropertyOverrides = IntStream.range(0,
3).boxed().collect(Collectors.toMap(brokerId -> brokerId, brokerId -> Map.of(
SocketServerConfigs.LISTENERS_CONFIG, "EXTERNAL://localhost:0",
SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"EXTERNAL://advertised-host-" + brokerId + ":" + (brokerId + 100)
)));
```
##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -262,6 +285,375 @@ private void assertFoobarValue(KafkaClusterTestKit
cluster, int expected) throws
});
}
+ @Test
+ public void testCreateClusterAndCreateListDeleteTopic() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+ TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+ "RaftManager was not initialized.");
+
+ String testTopic = "test-topic";
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ // Create a test topic
+ List<NewTopic> newTopic = List.of(new NewTopic(testTopic, 1,
(short) 3));
+ CreateTopicsResult createTopicResult =
admin.createTopics(newTopic);
+ createTopicResult.all().get();
+ waitForTopicListing(admin, List.of(testTopic), List.of());
+
+ // Delete topic
+ DeleteTopicsResult deleteResult =
admin.deleteTopics(List.of(testTopic));
+ deleteResult.all().get();
+
+ // List again
+ waitForTopicListing(admin, List.of(), List.of(testTopic));
+ }
+ }
+ }
+
+ @Test
+ public void testCreateClusterAndCreateAndManyTopics() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+ TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+ "RaftManager was not initialized.");
+
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ // Create many topics
+ List<NewTopic> newTopics = List.of(
+ new NewTopic("test-topic-1", 2, (short) 3),
+ new NewTopic("test-topic-2", 2, (short) 3),
+ new NewTopic("test-topic-3", 2, (short) 3)
+ );
+ CreateTopicsResult createTopicResult =
admin.createTopics(newTopics);
+ createTopicResult.all().get();
+
+ // List created topics
+ waitForTopicListing(admin, List.of("test-topic-1",
"test-topic-2", "test-topic-3"), List.of());
+ }
+ }
+ }
+
+ private Map<ClientQuotaEntity, Map<String, Double>> alterThenDescribe(
+ Admin admin,
+ ClientQuotaEntity entity,
+ List<ClientQuotaAlteration.Op> quotas,
+ ClientQuotaFilter filter,
+ int expectCount
+ ) throws Exception {
+ AlterClientQuotasResult alterResult =
admin.alterClientQuotas(List.of(new ClientQuotaAlteration(entity, quotas)));
+ alterResult.all().get();
+
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Map<String, Double>> results =
admin.describeClientQuotas(filter).entities().get();
+ return results.getOrDefault(entity, Map.of()).size() ==
expectCount;
+ }, "Broker never saw new client quotas");
+
+ return admin.describeClientQuotas(filter).entities().get();
+ }
+
+ @Test
+ public void testClientQuotas() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ ClientQuotaEntity entity = new
ClientQuotaEntity(Map.of("user", "testkit"));
+ ClientQuotaFilter filter = ClientQuotaFilter.containsOnly(
+ List.of(ClientQuotaFilterComponent.ofEntity("user",
"testkit")));
+
+ Map<ClientQuotaEntity, Map<String, Double>> describeResult =
alterThenDescribe(admin, entity,
+ List.of(new ClientQuotaAlteration.Op("request_percentage",
0.99)), filter, 1);
+ assertEquals(0.99,
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+ describeResult = alterThenDescribe(admin, entity, List.of(
+ new ClientQuotaAlteration.Op("request_percentage", 0.97),
+ new ClientQuotaAlteration.Op("producer_byte_rate",
10000.0),
+ new ClientQuotaAlteration.Op("consumer_byte_rate", 10001.0)
+ ), filter, 3);
+ assertEquals(0.97,
describeResult.get(entity).get("request_percentage"), 1e-6);
+ assertEquals(10000.0,
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+ assertEquals(10001.0,
describeResult.get(entity).get("consumer_byte_rate"), 1e-6);
+
+ describeResult = alterThenDescribe(admin, entity, List.of(
+ new ClientQuotaAlteration.Op("request_percentage", 0.95),
+ new ClientQuotaAlteration.Op("producer_byte_rate", null),
+ new ClientQuotaAlteration.Op("consumer_byte_rate", null)
+ ), filter, 1);
+ assertEquals(0.95,
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+ alterThenDescribe(admin, entity, List.of(
+ new ClientQuotaAlteration.Op("request_percentage", null)),
filter, 0);
+
+ describeResult = alterThenDescribe(admin, entity,
+ List.of(new ClientQuotaAlteration.Op("producer_byte_rate",
9999.0)), filter, 1);
+ assertEquals(9999.0,
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+
+ ClientQuotaEntity entity2 = new
ClientQuotaEntity(Map.of("user", "testkit", "client-id", "some-client"));
+ filter = ClientQuotaFilter.containsOnly(
+ List.of(
+ ClientQuotaFilterComponent.ofEntity("user", "testkit"),
+ ClientQuotaFilterComponent.ofEntity("client-id",
"some-client")
+ ));
+ describeResult = alterThenDescribe(admin, entity2,
+ List.of(new ClientQuotaAlteration.Op("producer_byte_rate",
9998.0)), filter, 1);
+ assertEquals(9998.0,
describeResult.get(entity2).get("producer_byte_rate"), 1e-6);
+
+ final ClientQuotaFilter finalFilter =
ClientQuotaFilter.contains(
+ List.of(ClientQuotaFilterComponent.ofEntity("user",
"testkit")));
+
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Map<String, Double>> results =
admin.describeClientQuotas(finalFilter).entities().get();
+ if (results.size() != 2) {
+ return false;
+ }
+ assertEquals(9999.0,
results.get(entity).get("producer_byte_rate"), 1e-6);
+ assertEquals(9998.0,
results.get(entity2).get("producer_byte_rate"), 1e-6);
+ return true;
+ }, "Broker did not see two client quotas");
+ }
+ }
+ }
+
+ private void setConsumerByteRate(Admin admin, ClientQuotaEntity entity,
Long value) throws Exception {
+ admin.alterClientQuotas(List.of(
+ new ClientQuotaAlteration(entity, List.of(
+ new ClientQuotaAlteration.Op("consumer_byte_rate",
value.doubleValue())))
+ )).all().get();
+ }
+
+ private Map<ClientQuotaEntity, Long> getConsumerByteRates(Admin admin)
throws Exception {
+ return
admin.describeClientQuotas(ClientQuotaFilter.contains(List.of()))
+ .entities().get()
+ .entrySet().stream()
+ .filter(entry ->
entry.getValue().containsKey("consumer_byte_rate"))
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> entry.getValue().get("consumer_byte_rate").longValue()
+ ));
+ }
+
+ @Test
+ public void testDefaultClientQuotas() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ ClientQuotaEntity defaultUser = new
ClientQuotaEntity(Collections.singletonMap("user", null));
+ ClientQuotaEntity bobUser = new
ClientQuotaEntity(Map.of("user", "bob"));
+
+ TestUtils.waitForCondition(
+ () -> getConsumerByteRates(admin).isEmpty(),
+ "Initial consumer byte rates should be empty");
+
+ setConsumerByteRate(admin, defaultUser, 100L);
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Long> rates =
getConsumerByteRates(admin);
+ return rates.size() == 1 &&
+ rates.get(defaultUser) == 100L;
+ }, "Default user rate should be 100");
+
+ setConsumerByteRate(admin, bobUser, 1000L);
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Long> rates =
getConsumerByteRates(admin);
+ return rates.size() == 2 &&
+ rates.get(defaultUser) == 100L &&
+ rates.get(bobUser) == 1000L;
+ }, "Should have both default and bob user rates");
+ }
+ }
+ }
+
+ @Test
+ public void testCreateClusterWithAdvertisedPortZero() throws Exception {
+ Map<Integer, Map<String, String>> brokerPropertyOverrides = new
HashMap<>();
+ for (int brokerId = 0; brokerId < 3; brokerId++) {
+ Map<String, String> props = new HashMap<>();
+ props.put(SocketServerConfigs.LISTENERS_CONFIG,
"EXTERNAL://localhost:0");
+ props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"EXTERNAL://localhost:0");
+ brokerPropertyOverrides.put(brokerId, props);
+ }
+
+ TestKitNodes nodes = new TestKitNodes.Builder()
+ .setNumControllerNodes(1)
+ .setNumBrokerNodes(3)
+ .setPerServerProperties(brokerPropertyOverrides)
+ .build();
+
+ doOnStartedKafkaCluster(nodes, cluster ->
+
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
Duration.ofSeconds(15), cluster)
+ .nodes().values().forEach(broker -> {
+ assertEquals("localhost", broker.host(),
+ "Did not advertise configured advertised host");
+
assertEquals(cluster.brokers().get(broker.id()).socketServer().boundPort(cluster.nodes().brokerListenerName()),
broker.port(),
+ "Did not advertise bound socket port");
+ }));
+ }
+
+ @Test
+ public void
testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer() throws
Exception {
+ Map<Integer, Map<String, String>> brokerPropertyOverrides = new
HashMap<>();
+ for (int brokerId = 0; brokerId < 3; brokerId++) {
+ Map<String, String> props = new HashMap<>();
+ props.put(SocketServerConfigs.LISTENERS_CONFIG,
"EXTERNAL://localhost:0");
+ props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
+ "EXTERNAL://advertised-host-" + brokerId + ":" + (brokerId +
100));
+ brokerPropertyOverrides.put(brokerId, props);
+ }
+
+ TestKitNodes nodes = new TestKitNodes.Builder()
+ .setNumControllerNodes(1)
+ .setNumBrokerNodes(3)
+ .setNumDisksPerBroker(1)
+ .setPerServerProperties(brokerPropertyOverrides)
+ .build();
+
+ doOnStartedKafkaCluster(nodes, cluster ->
+
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
Duration.ofSeconds(15), cluster)
+ .nodes().values().forEach(broker -> {
+ assertEquals("advertised-host-" + broker.id(),
broker.host(), "Did not advertise configured advertised host");
+ assertEquals(broker.id() + 100, broker.port(), "Did not
advertise configured advertised port");
+ }));
+ }
+
+ private void doOnStartedKafkaCluster(TestKitNodes nodes,
ThrowingConsumer<KafkaClusterTestKit> action) throws Exception {
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).build()) {
+ cluster.format();
+ cluster.startup();
+ action.accept(cluster);
+ }
+ }
+
+ private DescribeClusterResponse
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(
+ ListenerName listenerName,
+ Duration waitTime,
+ KafkaClusterTestKit cluster
+ ) throws Exception {
+ long startTime = System.currentTimeMillis();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+ TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+ "RaftManager was not initialized.");
+
+ Duration remainingWaitTime =
waitTime.minus(Duration.ofMillis(System.currentTimeMillis() - startTime));
+
+ final DescribeClusterResponse[] currentResponse = new
DescribeClusterResponse[1];
+ int expectedBrokerCount = cluster.nodes().brokerNodes().size();
+ TestUtils.waitForCondition(
+ () -> {
+ currentResponse[0] = connectAndReceive(
+ new DescribeClusterRequest.Builder(new
DescribeClusterRequestData()).build(),
+
cluster.brokers().get(0).socketServer().boundPort(listenerName)
+ );
+ return currentResponse[0].nodes().size() ==
expectedBrokerCount;
+ },
+ remainingWaitTime.toMillis(),
+ String.format("After %s ms Broker is only aware of %s brokers, but
%s are expected", remainingWaitTime.toMillis(), expectedBrokerCount,
expectedBrokerCount)
+ );
+
+ return currentResponse[0];
+ }
+
+ @FunctionalInterface
+ private interface ThrowingConsumer<T> {
+ void accept(T t) throws Exception;
+ }
+
+ private void waitForTopicListing(Admin admin, List<String>
expectedPresent, List<String> expectedAbsent)
+ throws InterruptedException {
+ Set<String> topicsNotFound = new HashSet<>(expectedPresent);
+ Set<String> extraTopics = new HashSet<>();
+ TestUtils.waitForCondition(() -> {
+ Set<String> topicNames = admin.listTopics().names().get();
+ topicsNotFound.removeAll(topicNames);
+ extraTopics.clear();
+
extraTopics.addAll(topicNames.stream().filter(expectedAbsent::contains).collect(Collectors.toSet()));
+ return topicsNotFound.isEmpty() && extraTopics.isEmpty();
+ }, String.format("Failed to find topic(s): %s and NOT find topic(s):
%s", topicsNotFound, extraTopics));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testUnregisterBroker(boolean usingBootstrapController) throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(4)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ TestUtils.waitForCondition(() ->
brokerIsUnfenced(clusterImage(cluster, 1), 0),
+ "Timed out waiting for broker 0 to be unfenced.");
+ cluster.brokers().get(0).shutdown();
+ TestUtils.waitForCondition(() ->
!brokerIsUnfenced(clusterImage(cluster, 1), 0),
+ "Timed out waiting for broker 0 to be fenced.");
+
+ try (Admin admin = createAdminClient(cluster,
usingBootstrapController)) {
+ admin.unregisterBroker(0);
+ }
+
+ TestUtils.waitForCondition(() ->
brokerIsAbsent(clusterImage(cluster, 1), 0),
+ "Timed out waiting for broker 0 to be fenced.");
+ }
+ }
+
+ private ClusterImage clusterImage(KafkaClusterTestKit cluster, int
brokerId) {
+ return
cluster.brokers().get(brokerId).metadataCache().currentImage().cluster();
+ }
+
+ private boolean brokerIsUnfenced(ClusterImage image, int brokerId) {
+ BrokerRegistration registration = image.brokers().get(brokerId);
+ if (registration == null) {
+ return false;
+ }
+ return !registration.fenced();
+ }
+
+ private boolean brokerIsAbsent(ClusterImage image, int brokerId) {
+ return !image.brokers().containsKey(brokerId);
+ }
+
+ private Admin createAdminClient(KafkaClusterTestKit cluster, boolean
bootstrapController) {
+ Properties props;
+ if (bootstrapController) {
+ props =
cluster.newClientPropertiesBuilder().setUsingBootstrapControllers(true).build();
Review Comment:
this call chain is somewhat redundant. I suggest we make it consistent with
`ClusterInstance#admin`
##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -262,6 +285,375 @@ private void assertFoobarValue(KafkaClusterTestKit
cluster, int expected) throws
});
}
+ @Test
+ public void testCreateClusterAndCreateListDeleteTopic() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+ TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+ "RaftManager was not initialized.");
+
+ String testTopic = "test-topic";
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ // Create a test topic
+ List<NewTopic> newTopic = List.of(new NewTopic(testTopic, 1,
(short) 3));
+ CreateTopicsResult createTopicResult =
admin.createTopics(newTopic);
+ createTopicResult.all().get();
+ waitForTopicListing(admin, List.of(testTopic), List.of());
+
+ // Delete topic
+ DeleteTopicsResult deleteResult =
admin.deleteTopics(List.of(testTopic));
+ deleteResult.all().get();
+
+ // List again
+ waitForTopicListing(admin, List.of(), List.of(testTopic));
+ }
+ }
+ }
+
+ @Test
+ public void testCreateClusterAndCreateAndManyTopics() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+ TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+ "RaftManager was not initialized.");
+
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ // Create many topics
+ List<NewTopic> newTopics = List.of(
+ new NewTopic("test-topic-1", 2, (short) 3),
+ new NewTopic("test-topic-2", 2, (short) 3),
+ new NewTopic("test-topic-3", 2, (short) 3)
+ );
+ CreateTopicsResult createTopicResult =
admin.createTopics(newTopics);
+ createTopicResult.all().get();
+
+ // List created topics
+ waitForTopicListing(admin, List.of("test-topic-1",
"test-topic-2", "test-topic-3"), List.of());
+ }
+ }
+ }
+
+ private Map<ClientQuotaEntity, Map<String, Double>> alterThenDescribe(
+ Admin admin,
+ ClientQuotaEntity entity,
+ List<ClientQuotaAlteration.Op> quotas,
+ ClientQuotaFilter filter,
+ int expectCount
+ ) throws Exception {
+ AlterClientQuotasResult alterResult =
admin.alterClientQuotas(List.of(new ClientQuotaAlteration(entity, quotas)));
+ alterResult.all().get();
+
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Map<String, Double>> results =
admin.describeClientQuotas(filter).entities().get();
+ return results.getOrDefault(entity, Map.of()).size() ==
expectCount;
+ }, "Broker never saw new client quotas");
+
+ return admin.describeClientQuotas(filter).entities().get();
+ }
+
+ @Test
+ public void testClientQuotas() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ ClientQuotaEntity entity = new
ClientQuotaEntity(Map.of("user", "testkit"));
+ ClientQuotaFilter filter = ClientQuotaFilter.containsOnly(
+ List.of(ClientQuotaFilterComponent.ofEntity("user",
"testkit")));
+
+ Map<ClientQuotaEntity, Map<String, Double>> describeResult =
alterThenDescribe(admin, entity,
+ List.of(new ClientQuotaAlteration.Op("request_percentage",
0.99)), filter, 1);
+ assertEquals(0.99,
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+ describeResult = alterThenDescribe(admin, entity, List.of(
+ new ClientQuotaAlteration.Op("request_percentage", 0.97),
+ new ClientQuotaAlteration.Op("producer_byte_rate",
10000.0),
+ new ClientQuotaAlteration.Op("consumer_byte_rate", 10001.0)
+ ), filter, 3);
+ assertEquals(0.97,
describeResult.get(entity).get("request_percentage"), 1e-6);
+ assertEquals(10000.0,
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+ assertEquals(10001.0,
describeResult.get(entity).get("consumer_byte_rate"), 1e-6);
+
+ describeResult = alterThenDescribe(admin, entity, List.of(
+ new ClientQuotaAlteration.Op("request_percentage", 0.95),
+ new ClientQuotaAlteration.Op("producer_byte_rate", null),
+ new ClientQuotaAlteration.Op("consumer_byte_rate", null)
+ ), filter, 1);
+ assertEquals(0.95,
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+ alterThenDescribe(admin, entity, List.of(
+ new ClientQuotaAlteration.Op("request_percentage", null)),
filter, 0);
+
+ describeResult = alterThenDescribe(admin, entity,
+ List.of(new ClientQuotaAlteration.Op("producer_byte_rate",
9999.0)), filter, 1);
+ assertEquals(9999.0,
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+
+ ClientQuotaEntity entity2 = new
ClientQuotaEntity(Map.of("user", "testkit", "client-id", "some-client"));
+ filter = ClientQuotaFilter.containsOnly(
+ List.of(
+ ClientQuotaFilterComponent.ofEntity("user", "testkit"),
+ ClientQuotaFilterComponent.ofEntity("client-id",
"some-client")
+ ));
+ describeResult = alterThenDescribe(admin, entity2,
+ List.of(new ClientQuotaAlteration.Op("producer_byte_rate",
9998.0)), filter, 1);
+ assertEquals(9998.0,
describeResult.get(entity2).get("producer_byte_rate"), 1e-6);
+
+ final ClientQuotaFilter finalFilter =
ClientQuotaFilter.contains(
+ List.of(ClientQuotaFilterComponent.ofEntity("user",
"testkit")));
+
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Map<String, Double>> results =
admin.describeClientQuotas(finalFilter).entities().get();
+ if (results.size() != 2) {
+ return false;
+ }
+ assertEquals(9999.0,
results.get(entity).get("producer_byte_rate"), 1e-6);
+ assertEquals(9998.0,
results.get(entity2).get("producer_byte_rate"), 1e-6);
+ return true;
+ }, "Broker did not see two client quotas");
+ }
+ }
+ }
+
+ private void setConsumerByteRate(Admin admin, ClientQuotaEntity entity,
Long value) throws Exception {
+ admin.alterClientQuotas(List.of(
+ new ClientQuotaAlteration(entity, List.of(
+ new ClientQuotaAlteration.Op("consumer_byte_rate",
value.doubleValue())))
+ )).all().get();
+ }
+
+ private Map<ClientQuotaEntity, Long> getConsumerByteRates(Admin admin)
throws Exception {
+ return
admin.describeClientQuotas(ClientQuotaFilter.contains(List.of()))
+ .entities().get()
+ .entrySet().stream()
+ .filter(entry ->
entry.getValue().containsKey("consumer_byte_rate"))
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> entry.getValue().get("consumer_byte_rate").longValue()
+ ));
+ }
+
+ @Test
+ public void testDefaultClientQuotas() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ ClientQuotaEntity defaultUser = new
ClientQuotaEntity(Collections.singletonMap("user", null));
+ ClientQuotaEntity bobUser = new
ClientQuotaEntity(Map.of("user", "bob"));
+
+ TestUtils.waitForCondition(
+ () -> getConsumerByteRates(admin).isEmpty(),
+ "Initial consumer byte rates should be empty");
+
+ setConsumerByteRate(admin, defaultUser, 100L);
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Long> rates =
getConsumerByteRates(admin);
+ return rates.size() == 1 &&
+ rates.get(defaultUser) == 100L;
+ }, "Default user rate should be 100");
+
+ setConsumerByteRate(admin, bobUser, 1000L);
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Long> rates =
getConsumerByteRates(admin);
+ return rates.size() == 2 &&
+ rates.get(defaultUser) == 100L &&
+ rates.get(bobUser) == 1000L;
+ }, "Should have both default and bob user rates");
+ }
+ }
+ }
+
+ @Test
+ public void testCreateClusterWithAdvertisedPortZero() throws Exception {
+ Map<Integer, Map<String, String>> brokerPropertyOverrides = new
HashMap<>();
+ for (int brokerId = 0; brokerId < 3; brokerId++) {
+ Map<String, String> props = new HashMap<>();
+ props.put(SocketServerConfigs.LISTENERS_CONFIG,
"EXTERNAL://localhost:0");
+ props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"EXTERNAL://localhost:0");
+ brokerPropertyOverrides.put(brokerId, props);
+ }
+
+ TestKitNodes nodes = new TestKitNodes.Builder()
+ .setNumControllerNodes(1)
+ .setNumBrokerNodes(3)
+ .setPerServerProperties(brokerPropertyOverrides)
+ .build();
+
+ doOnStartedKafkaCluster(nodes, cluster ->
+
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
Duration.ofSeconds(15), cluster)
+ .nodes().values().forEach(broker -> {
+ assertEquals("localhost", broker.host(),
+ "Did not advertise configured advertised host");
+
assertEquals(cluster.brokers().get(broker.id()).socketServer().boundPort(cluster.nodes().brokerListenerName()),
broker.port(),
+ "Did not advertise bound socket port");
+ }));
+ }
+
+ @Test
+ public void
testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer() throws
Exception {
+ Map<Integer, Map<String, String>> brokerPropertyOverrides = new
HashMap<>();
+ for (int brokerId = 0; brokerId < 3; brokerId++) {
+ Map<String, String> props = new HashMap<>();
+ props.put(SocketServerConfigs.LISTENERS_CONFIG,
"EXTERNAL://localhost:0");
+ props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
+ "EXTERNAL://advertised-host-" + brokerId + ":" + (brokerId +
100));
+ brokerPropertyOverrides.put(brokerId, props);
+ }
+
+ TestKitNodes nodes = new TestKitNodes.Builder()
+ .setNumControllerNodes(1)
+ .setNumBrokerNodes(3)
+ .setNumDisksPerBroker(1)
+ .setPerServerProperties(brokerPropertyOverrides)
+ .build();
+
+ doOnStartedKafkaCluster(nodes, cluster ->
+
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
Duration.ofSeconds(15), cluster)
+ .nodes().values().forEach(broker -> {
+ assertEquals("advertised-host-" + broker.id(),
broker.host(), "Did not advertise configured advertised host");
+ assertEquals(broker.id() + 100, broker.port(), "Did not
advertise configured advertised port");
+ }));
+ }
+
+ private void doOnStartedKafkaCluster(TestKitNodes nodes,
ThrowingConsumer<KafkaClusterTestKit> action) throws Exception {
Review Comment:
I suggest using `java.util.function.Consumer`. This would require updating
the other lambda functions to ensure they only throw `RuntimeException`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]