DL1231 commented on code in PR #20861:
URL: https://github.com/apache/kafka/pull/20861#discussion_r2522941430
##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -262,6 +285,374 @@ private void assertFoobarValue(KafkaClusterTestKit
cluster, int expected) throws
});
}
+ @Test
+ public void testCreateClusterAndCreateListDeleteTopic() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+ TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+ "RaftManager was not initialized.");
+
+ String testTopic = "test-topic";
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ // Create a test topic
+ List<NewTopic> newTopic = List.of(new NewTopic(testTopic, 1,
(short) 3));
+ CreateTopicsResult createTopicResult =
admin.createTopics(newTopic);
+ createTopicResult.all().get();
+ waitForTopicListing(admin, List.of(testTopic), List.of());
+
+ // Delete topic
+ DeleteTopicsResult deleteResult =
admin.deleteTopics(List.of(testTopic));
+ deleteResult.all().get();
+
+ // List again
+ waitForTopicListing(admin, List.of(), List.of(testTopic));
+ }
+ }
+ }
+
+ @Test
+ public void testCreateClusterAndCreateAndManyTopics() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+ TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+ "RaftManager was not initialized.");
+
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ // Create many topics
+ List<NewTopic> newTopics = List.of(
+ new NewTopic("test-topic-1", 2, (short) 3),
+ new NewTopic("test-topic-2", 2, (short) 3),
+ new NewTopic("test-topic-3", 2, (short) 3)
+ );
+ CreateTopicsResult createTopicResult =
admin.createTopics(newTopics);
+ createTopicResult.all().get();
+
+ // List created topics
+ waitForTopicListing(admin, List.of("test-topic-1",
"test-topic-2", "test-topic-3"), List.of());
+ }
+ }
+ }
+
+ private Map<ClientQuotaEntity, Map<String, Double>> alterThenDescribe(
+ Admin admin,
+ ClientQuotaEntity entity,
+ List<ClientQuotaAlteration.Op> quotas,
+ ClientQuotaFilter filter,
+ int expectCount
+ ) throws Exception {
+ AlterClientQuotasResult alterResult =
admin.alterClientQuotas(List.of(new ClientQuotaAlteration(entity, quotas)));
+ alterResult.all().get();
+
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Map<String, Double>> results =
admin.describeClientQuotas(filter).entities().get();
+ return results.getOrDefault(entity, Map.of()).size() ==
expectCount;
+ }, "Broker never saw new client quotas");
+
+ return admin.describeClientQuotas(filter).entities().get();
+ }
+
+ @Test
+ public void testClientQuotas() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ ClientQuotaEntity entity = new
ClientQuotaEntity(Map.of("user", "testkit"));
+ ClientQuotaFilter filter = ClientQuotaFilter.containsOnly(
+ List.of(ClientQuotaFilterComponent.ofEntity("user",
"testkit")));
+
+ Map<ClientQuotaEntity, Map<String, Double>> describeResult =
alterThenDescribe(admin, entity,
+ List.of(new ClientQuotaAlteration.Op("request_percentage",
0.99)), filter, 1);
+ assertEquals(0.99,
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+ describeResult = alterThenDescribe(admin, entity, List.of(
+ new ClientQuotaAlteration.Op("request_percentage", 0.97),
+ new ClientQuotaAlteration.Op("producer_byte_rate",
10000.0),
+ new ClientQuotaAlteration.Op("consumer_byte_rate", 10001.0)
+ ), filter, 3);
+ assertEquals(0.97,
describeResult.get(entity).get("request_percentage"), 1e-6);
+ assertEquals(10000.0,
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+ assertEquals(10001.0,
describeResult.get(entity).get("consumer_byte_rate"), 1e-6);
+
+ describeResult = alterThenDescribe(admin, entity, List.of(
+ new ClientQuotaAlteration.Op("request_percentage", 0.95),
+ new ClientQuotaAlteration.Op("producer_byte_rate", null),
+ new ClientQuotaAlteration.Op("consumer_byte_rate", null)
+ ), filter, 1);
+ assertEquals(0.95,
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+ alterThenDescribe(admin, entity, List.of(
+ new ClientQuotaAlteration.Op("request_percentage", null)),
filter, 0);
+
+ describeResult = alterThenDescribe(admin, entity,
+ List.of(new ClientQuotaAlteration.Op("producer_byte_rate",
9999.0)), filter, 1);
+ assertEquals(9999.0,
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+
+ ClientQuotaEntity entity2 = new
ClientQuotaEntity(Map.of("user", "testkit", "client-id", "some-client"));
+ filter = ClientQuotaFilter.containsOnly(
+ List.of(
+ ClientQuotaFilterComponent.ofEntity("user", "testkit"),
+ ClientQuotaFilterComponent.ofEntity("client-id",
"some-client")
+ ));
+ describeResult = alterThenDescribe(admin, entity2,
+ List.of(new ClientQuotaAlteration.Op("producer_byte_rate",
9998.0)), filter, 1);
+ assertEquals(9998.0,
describeResult.get(entity2).get("producer_byte_rate"), 1e-6);
+
+ final ClientQuotaFilter finalFilter =
ClientQuotaFilter.contains(
+ List.of(ClientQuotaFilterComponent.ofEntity("user",
"testkit")));
+
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Map<String, Double>> results =
admin.describeClientQuotas(finalFilter).entities().get();
+ if (results.size() != 2) {
+ return false;
+ }
+ assertEquals(9999.0,
results.get(entity).get("producer_byte_rate"), 1e-6);
+ assertEquals(9998.0,
results.get(entity2).get("producer_byte_rate"), 1e-6);
+ return true;
+ }, "Broker did not see two client quotas");
+ }
+ }
+ }
+
+ private void setConsumerByteRate(Admin admin, ClientQuotaEntity entity,
Long value) throws Exception {
+ admin.alterClientQuotas(List.of(
+ new ClientQuotaAlteration(entity, List.of(
+ new ClientQuotaAlteration.Op("consumer_byte_rate",
value.doubleValue())))
+ )).all().get();
+ }
+
+ private Map<ClientQuotaEntity, Long> getConsumerByteRates(Admin admin)
throws Exception {
+ return
admin.describeClientQuotas(ClientQuotaFilter.contains(List.of()))
+ .entities().get()
+ .entrySet().stream()
+ .filter(entry ->
entry.getValue().containsKey("consumer_byte_rate"))
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> entry.getValue().get("consumer_byte_rate").longValue()
+ ));
+ }
+
+ @Test
+ public void testDefaultClientQuotas() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ ClientQuotaEntity defaultUser = new
ClientQuotaEntity(Collections.singletonMap("user", null));
+ ClientQuotaEntity bobUser = new
ClientQuotaEntity(Map.of("user", "bob"));
+
+ TestUtils.waitForCondition(
+ () -> getConsumerByteRates(admin).isEmpty(),
+ "Initial consumer byte rates should be empty");
+
+ setConsumerByteRate(admin, defaultUser, 100L);
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Long> rates =
getConsumerByteRates(admin);
+ return rates.size() == 1 &&
+ rates.get(defaultUser) == 100L;
+ }, "Default user rate should be 100");
+
+ setConsumerByteRate(admin, bobUser, 1000L);
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Long> rates =
getConsumerByteRates(admin);
+ return rates.size() == 2 &&
+ rates.get(defaultUser) == 100L &&
+ rates.get(bobUser) == 1000L;
+ }, "Should have both default and bob user rates");
+ }
+ }
+ }
+
+ @Test
+ public void testCreateClusterWithAdvertisedPortZero() throws Exception {
+ Map<Integer, Map<String, String>> brokerPropertyOverrides = new
HashMap<>();
+ for (int brokerId = 0; brokerId < 3; brokerId++) {
+ Map<String, String> props = new HashMap<>();
+ props.put(SocketServerConfigs.LISTENERS_CONFIG,
"EXTERNAL://localhost:0");
+ props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"EXTERNAL://localhost:0");
+ brokerPropertyOverrides.put(brokerId, props);
+ }
+
+ TestKitNodes nodes = new TestKitNodes.Builder()
+ .setNumControllerNodes(1)
+ .setNumBrokerNodes(3)
+ .setPerServerProperties(brokerPropertyOverrides)
+ .build();
+
+ doOnStartedKafkaCluster(nodes, cluster ->
+
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
Duration.ofSeconds(15), cluster)
+ .nodes().values().forEach(broker -> {
+ assertEquals("localhost", broker.host(),
+ "Did not advertise configured advertised host");
+
assertEquals(cluster.brokers().get(broker.id()).socketServer().boundPort(cluster.nodes().brokerListenerName()),
broker.port(),
+ "Did not advertise bound socket port");
+ }));
+ }
+
+ @Test
+ public void
testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer() throws
Exception {
+ Map<Integer, Map<String, String>> brokerPropertyOverrides = new
HashMap<>();
+ for (int brokerId = 0; brokerId < 3; brokerId++) {
+ Map<String, String> props = new HashMap<>();
+ props.put(SocketServerConfigs.LISTENERS_CONFIG,
"EXTERNAL://localhost:0");
+ props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
+ "EXTERNAL://advertised-host-" + brokerId + ":" + (brokerId +
100));
+ brokerPropertyOverrides.put(brokerId, props);
+ }
+
+ TestKitNodes nodes = new TestKitNodes.Builder()
+ .setNumControllerNodes(1)
+ .setNumBrokerNodes(3)
+ .setNumDisksPerBroker(1)
+ .setPerServerProperties(brokerPropertyOverrides)
+ .build();
+
+ doOnStartedKafkaCluster(nodes, cluster ->
+
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
Duration.ofSeconds(15), cluster)
+ .nodes().values().forEach(broker -> {
+ assertEquals("advertised-host-" + broker.id(),
broker.host(), "Did not advertise configured advertised host");
+ assertEquals(broker.id() + 100, broker.port(), "Did not
advertise configured advertised port");
+ }));
+ }
+
+ private void doOnStartedKafkaCluster(TestKitNodes nodes,
ThrowingConsumer<KafkaClusterTestKit> action) throws Exception {
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).build()) {
+ cluster.format();
+ cluster.startup();
+ action.accept(cluster);
+ }
+ }
+
+ private DescribeClusterResponse
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(
+ ListenerName listenerName,
+ Duration waitTime,
+ KafkaClusterTestKit cluster
+ ) throws Exception {
+ long startTime = System.currentTimeMillis();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+ TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+ "RaftManager was not initialized.");
+
+ Duration remainingWaitTime =
waitTime.minus(Duration.ofMillis(System.currentTimeMillis() - startTime));
+
+ final DescribeClusterResponse[] currentResponse = new
DescribeClusterResponse[1];
+ int expectedBrokerCount = cluster.nodes().brokerNodes().size();
+ TestUtils.waitForCondition(
+ () -> {
+ currentResponse[0] = connectAndReceive(
+ new DescribeClusterRequest.Builder(new
DescribeClusterRequestData()).build(),
+
cluster.brokers().get(0).socketServer().boundPort(listenerName)
+ );
+ return currentResponse[0].nodes().size() ==
expectedBrokerCount;
+ },
+ remainingWaitTime.toMillis(),
+ String.format("After %s ms Broker is only aware of %s brokers, but
%s are expected", remainingWaitTime.toMillis(), expectedBrokerCount,
expectedBrokerCount)
+ );
+
+ return currentResponse[0];
+ }
+
+ @FunctionalInterface
+ private interface ThrowingConsumer<T> {
+ void accept(T t) throws Exception;
+ }
+
+ private void waitForTopicListing(Admin admin, List<String>
expectedPresent, List<String> expectedAbsent)
+ throws InterruptedException {
+ Set<String> topicsNotFound = new HashSet<>(expectedPresent);
+ Set<String> extraTopics = new HashSet<>();
+ TestUtils.waitForCondition(() -> {
+ Set<String> topicNames = admin.listTopics().names().get();
+ topicsNotFound.removeAll(topicNames);
+
extraTopics.addAll(topicNames.stream().filter(expectedAbsent::contains).collect(Collectors.toSet()));
Review Comment:
Good catch! I've fixed this. PTAL.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]