DL1231 commented on code in PR #21175:
URL: https://github.com/apache/kafka/pull/21175#discussion_r2638868939
##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -644,6 +674,544 @@ private Admin createAdminClient(KafkaClusterTestKit
cluster, boolean usingBootst
return cluster.admin(Map.of(AdminClientConfig.CLIENT_ID_CONFIG,
this.getClass().getName()), usingBootstrapControllers);
}
+ @Test
+ public void testCreateClusterAndPerformReassignment() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(4)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+
+ try (Admin admin = cluster.admin()) {
+ // Create the topic.
+ Map<Integer, List<Integer>> assignments = Map.of(
+ 0, List.of(0, 1, 2),
+ 1, List.of(1, 2, 3),
+ 2, List.of(2, 3, 0),
+ 3, List.of(3, 2, 1)
+ );
+
+ CreateTopicsResult createTopicResult =
admin.createTopics(List.of(
+ new NewTopic("foo", assignments)));
+ createTopicResult.all().get();
+ waitForTopicListing(admin, List.of("foo"), List.of());
+
+ // Start some reassignments.
+ assertEquals(Map.of(),
admin.listPartitionReassignments().reassignments().get());
+ Map<TopicPartition, Optional<NewPartitionReassignment>>
reassignments = Map.of(
+ new TopicPartition("foo", 0), Optional.of(new
NewPartitionReassignment(List.of(2, 1, 0))),
+ new TopicPartition("foo", 1), Optional.of(new
NewPartitionReassignment(List.of(0, 1, 2))),
+ new TopicPartition("foo", 2), Optional.of(new
NewPartitionReassignment(List.of(2, 3))),
+ new TopicPartition("foo", 3), Optional.of(new
NewPartitionReassignment(List.of(3, 2, 0, 1)))
+ );
+ admin.alterPartitionReassignments(reassignments).all().get();
+ TestUtils.waitForCondition(
+ () ->
admin.listPartitionReassignments().reassignments().get().isEmpty(),
+ "The reassignment never completed."
+ );
+ AtomicReference<List<List<Integer>>> currentMapping = new
AtomicReference<>(List.of());
+ List<List<Integer>> expectedMapping = List.of(
+ List.of(2, 1, 0),
+ List.of(0, 1, 2),
+ List.of(2, 3),
+ List.of(3, 2, 0, 1)
+ );
+ TestUtils.waitForCondition(() -> {
+ Map<String, TopicDescription> topicInfoMap =
admin.describeTopics(Set.of("foo")).allTopicNames().get();
+ if (topicInfoMap.containsKey("foo")) {
+
currentMapping.set(translatePartitionInfoToSeq(topicInfoMap.get("foo").partitions()));
+ return expectedMapping.equals(currentMapping.get());
+ } else {
+ return false;
+ }
+ }, () -> "Timed out waiting for replica assignments for topic
foo. " +
+ "Wanted: " + expectedMapping + ". Got: " +
currentMapping.get());
+
+ TestUtils.retryOnExceptionWithTimeout(60000, () ->
checkReplicaManager(
+ cluster,
+ Map.of(
+ 0, List.of(true, true, false, true),
+ 1, List.of(true, true, false, true),
+ 2, List.of(true, true, true, true),
+ 3, List.of(false, false, true, true)
+ )
+ ));
+ }
+ }
+ }
+
+ private void checkReplicaManager(KafkaClusterTestKit cluster, Map<Integer,
List<Boolean>> expectedHosting) {
+ for (Map.Entry<Integer, List<Boolean>> entry :
expectedHosting.entrySet()) {
+ int brokerId = entry.getKey();
+ List<Boolean> partitionsIsHosted = entry.getValue();
+ var broker = cluster.brokers().get(brokerId);
+
+ for (int partitionId = 0; partitionId < partitionsIsHosted.size();
partitionId++) {
+ boolean isHosted = partitionsIsHosted.get(partitionId);
+ TopicPartition topicPartition = new TopicPartition("foo",
partitionId);
+ var partition =
broker.replicaManager().getPartition(topicPartition);
+ if (isHosted) {
+
assertNotEquals(kafka.server.HostedPartition.None$.MODULE$, partition,
"topicPartition = " + topicPartition);
+ } else {
+ assertEquals(kafka.server.HostedPartition.None$.MODULE$,
partition, "topicPartition = " + topicPartition);
+ }
+ }
+ }
+ }
+
+ private List<List<Integer>>
translatePartitionInfoToSeq(List<TopicPartitionInfo> partitions) {
+ return partitions.stream()
+ .map(partition -> partition.replicas().stream()
+ .map(Node::id)
+ .collect(Collectors.toList()))
+ .collect(Collectors.toList());
+ }
+
+ @Test
+ public void testIncrementalAlterConfigs() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+
+ try (Admin admin = cluster.admin()) {
+ Map<ConfigResource, Collection<AlterConfigOp>> brokerConfigs =
Map.of(
+ new ConfigResource(Type.BROKER, ""),
+ List.of(
+ new AlterConfigOp(new ConfigEntry("log.roll.ms",
"1234567"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new
ConfigEntry("max.connections.per.ip", "60"), AlterConfigOp.OpType.SET)
+ )
+ );
+ assertEquals(List.of(ApiError.NONE), incrementalAlter(admin,
brokerConfigs));
+
+ validateConfigs(admin, Map.of(
+ new ConfigResource(Type.BROKER, ""), Map.of(
+ "log.roll.ms", "1234567",
+ "max.connections.per.ip", "60",
+ "min.insync.replicas", "1"
+ )), true);
+
+ admin.createTopics(List.of(
+ new NewTopic("foo", 2, (short) 3),
+ new NewTopic("bar", 2, (short) 3)
+ )).all().get();
+ waitForAllPartitions(cluster, "foo", 2);
+ waitForAllPartitions(cluster, "bar", 2);
+
+ validateConfigs(admin, Map.of(
+ new ConfigResource(Type.TOPIC, "bar"), Map.of()
+ ), false);
+
+ assertListEquals(List.of(ApiError.NONE,
+ new ApiError(Errors.INVALID_CONFIG, "Unknown topic config
name: not.a.real.topic.config"),
+ new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The topic
'baz' does not exist.")),
+ incrementalAlter(admin, Map.of(
+ new ConfigResource(Type.TOPIC, "foo"),
+ List.of(new AlterConfigOp(new
ConfigEntry("segment.jitter.ms", "345"), AlterConfigOp.OpType.SET)),
+ new ConfigResource(Type.TOPIC, "bar"),
+ List.of(new AlterConfigOp(new
ConfigEntry("not.a.real.topic.config", "789"), AlterConfigOp.OpType.SET)),
+ new ConfigResource(Type.TOPIC, "baz"),
+ List.of(new AlterConfigOp(new
ConfigEntry("segment.jitter.ms", "678"), AlterConfigOp.OpType.SET))
+ )));
+
+ validateConfigs(admin, Map.of(
+ new ConfigResource(Type.TOPIC, "foo"),
Map.of("segment.jitter.ms", "345")
+ ), false);
+ assertEquals(List.of(ApiError.NONE), incrementalAlter(admin,
Map.of(
+ new ConfigResource(Type.BROKER, "2"),
+ List.of(new AlterConfigOp(new
ConfigEntry("max.connections.per.ip", "7"), AlterConfigOp.OpType.SET))
+ )));
+ validateConfigs(admin, Map.of(
+ new ConfigResource(Type.BROKER, "2"),
Map.of("max.connections.per.ip", "7")
+ ), false);
+ }
+ }
+ }
+
+ private void waitForAllPartitions(KafkaClusterTestKit cluster, String
topic, int expectedNumPartitions)
+ throws InterruptedException {
+ TestUtils.waitForCondition(() ->
cluster.brokers().values().stream().allMatch(broker -> {
+ Optional<Integer> numPartitionsOpt =
broker.metadataCache().numPartitions(topic);
+ if (expectedNumPartitions == 0) {
+ return numPartitionsOpt.isEmpty();
+ } else {
+ return numPartitionsOpt.isPresent() && numPartitionsOpt.get()
== expectedNumPartitions;
+ }
+ }), 60000L, "Topic [" + topic + "] metadata not propagated after 60000
ms");
+ }
+
+ private List<ApiError> incrementalAlter(Admin admin, Map<ConfigResource,
Collection<AlterConfigOp>> changes) {
+ Map<ConfigResource, KafkaFuture<Void>> values =
admin.incrementalAlterConfigs(changes).values();
+ return changes.keySet().stream().map(resource -> {
+ try {
+ values.get(resource).get();
+ return ApiError.NONE;
+ } catch (ExecutionException e) {
+ return ApiError.fromThrowable(e.getCause());
+ } catch (Throwable t) {
+ return ApiError.fromThrowable(t);
+ }
+ }).collect(Collectors.toList());
+ }
+
+ private Map<ConfigResource, Map<String, String>> validateConfigs(
+ Admin admin,
+ Map<ConfigResource, Map<String, String>> expected,
+ boolean exhaustive
+ ) throws Exception {
+ Map<ConfigResource, Map<String, String>> results = new HashMap<>();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ try {
+ var values = admin.describeConfigs(expected.keySet()).values();
+ results.clear();
+ assertEquals(expected.keySet(), values.keySet());
+ for (Map.Entry<ConfigResource, Map<String, String>> entry :
expected.entrySet()) {
+ ConfigResource resource = entry.getKey();
+ Map<String, String> expectedPairs = entry.getValue();
+ var config = values.get(resource).get();
+ Map<String, String> actualMap = new TreeMap<>();
+ Map<String, String> expectedMap = new TreeMap<>();
+ config.entries().forEach(configEntry -> {
+ actualMap.put(configEntry.name(), configEntry.value());
+ if (!exhaustive) {
+ expectedMap.put(configEntry.name(),
configEntry.value());
+ }
+ });
+ expectedMap.putAll(expectedPairs);
+ assertEquals(expectedMap, actualMap);
+ results.put(resource, actualMap);
+ }
+ } catch (Exception t) {
+ LOG.warn("Unable to describeConfigs({})", expected.keySet(),
t);
+ throw t;
+ }
+ });
+ return results;
+ }
+
+ @Test
+ public void testSetLog4jConfigurations() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+
+ try (Admin admin = cluster.admin()) {
+ LOG.debug("setting log4j");
+ LOG_2.debug("setting log4j");
+
+ ConfigResource broker1 = new
ConfigResource(Type.BROKER_LOGGER, "1");
+ ConfigResource broker2 = new
ConfigResource(Type.BROKER_LOGGER, "2");
+ var initialLog4j = validateConfigs(admin, Map.of(broker1,
Map.of()), false);
+
+ assertListEquals(List.of(ApiError.NONE,
+ new ApiError(Errors.INVALID_REQUEST, "APPEND operation is
not allowed for the BROKER_LOGGER resource")),
+ incrementalAlter(admin, Map.of(
+ broker1, List.of(
+ new AlterConfigOp(new ConfigEntry(LOG.getName(),
"TRACE"), OpType.SET),
+ new AlterConfigOp(new ConfigEntry(LOG_2.getName(),
"TRACE"), OpType.SET)),
+ broker2, List.of(
+ new AlterConfigOp(new ConfigEntry(LOG.getName(),
"TRACE"), OpType.APPEND),
+ new AlterConfigOp(new ConfigEntry(LOG_2.getName(),
"TRACE"), OpType.APPEND)))
+ )
+ );
+
+ validateConfigs(admin, Map.of(
+ broker1, Map.of(
+ LOG.getName(), "TRACE",
+ LOG_2.getName(), "TRACE"
+ )
+ ), false);
+
+ assertListEquals(List.of(ApiError.NONE,
+ new ApiError(Errors.INVALID_REQUEST, "SUBTRACT operation
is not allowed for the BROKER_LOGGER resource")),
+ incrementalAlter(admin, Map.of(
+ broker1, List.of(
+ new AlterConfigOp(new ConfigEntry(LOG.getName(),
""), OpType.DELETE),
+ new AlterConfigOp(new ConfigEntry(LOG_2.getName(),
""), OpType.DELETE)),
+ broker2, List.of(
+ new AlterConfigOp(new ConfigEntry(LOG.getName(),
"TRACE"), OpType.SUBTRACT),
+ new AlterConfigOp(new ConfigEntry(LOG_2.getName(),
"TRACE"), OpType.SUBTRACT)))
+ )
+ );
+
+ validateConfigs(admin, Map.of(
+ broker1, Map.of(
+ LOG.getName(),
initialLog4j.get(broker1).get(LOG.getName()),
+ LOG_2.getName(),
initialLog4j.get(broker1).get(LOG_2.getName())
+ )
+ ), false);
+ }
+ }
+ }
+
+ private void assertListEquals(List<ApiError> expected, List<ApiError>
actual) {
Review Comment:
The reason I've opted for the custom `assertListEquals` method is that the
`incrementalAlterConfigs` API does not guarantee the order of the `ApiError`
objects in its returned list.
As you know, `assertIterableEquals` performs a strict comparison that
requires the elements in both iterables to be in the exact same sequence.
The current `assertListEquals` implementation correctly verifies that the
actual list contains all the expected errors and no unexpected ones, but it
does so without being sensitive to their order.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]