chia7712 commented on code in PR #21175:
URL: https://github.com/apache/kafka/pull/21175#discussion_r2649647159
##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -644,6 +674,537 @@ private Admin createAdminClient(KafkaClusterTestKit
cluster, boolean usingBootst
return cluster.admin(Map.of(AdminClientConfig.CLIENT_ID_CONFIG,
this.getClass().getName()), usingBootstrapControllers);
}
+ @Test
+ public void testCreateClusterAndPerformReassignment() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(4)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+
+ try (Admin admin = cluster.admin()) {
+ // Create the topic.
+ Map<Integer, List<Integer>> assignments = Map.of(
+ 0, List.of(0, 1, 2),
+ 1, List.of(1, 2, 3),
+ 2, List.of(2, 3, 0),
+ 3, List.of(3, 2, 1)
+ );
+
+ CreateTopicsResult createTopicResult =
admin.createTopics(List.of(
+ new NewTopic("foo", assignments)));
+ createTopicResult.all().get();
+ waitForTopicListing(admin, List.of("foo"), List.of());
+
+ // Start some reassignments.
+ assertEquals(Map.of(),
admin.listPartitionReassignments().reassignments().get());
+ Map<TopicPartition, Optional<NewPartitionReassignment>>
reassignments = Map.of(
+ new TopicPartition("foo", 0), Optional.of(new
NewPartitionReassignment(List.of(2, 1, 0))),
+ new TopicPartition("foo", 1), Optional.of(new
NewPartitionReassignment(List.of(0, 1, 2))),
+ new TopicPartition("foo", 2), Optional.of(new
NewPartitionReassignment(List.of(2, 3))),
+ new TopicPartition("foo", 3), Optional.of(new
NewPartitionReassignment(List.of(3, 2, 0, 1)))
+ );
+ admin.alterPartitionReassignments(reassignments).all().get();
+ TestUtils.waitForCondition(
+ () ->
admin.listPartitionReassignments().reassignments().get().isEmpty(),
+ "The reassignment never completed."
+ );
+ AtomicReference<List<List<Integer>>> currentMapping = new
AtomicReference<>(List.of());
+ List<List<Integer>> expectedMapping = List.of(
+ List.of(2, 1, 0),
+ List.of(0, 1, 2),
+ List.of(2, 3),
+ List.of(3, 2, 0, 1)
+ );
+ TestUtils.waitForCondition(() -> {
+ Map<String, TopicDescription> topicInfoMap =
admin.describeTopics(Set.of("foo")).allTopicNames().get();
+ if (topicInfoMap.containsKey("foo")) {
+
currentMapping.set(translatePartitionInfoToSeq(topicInfoMap.get("foo").partitions()));
+ return expectedMapping.equals(currentMapping.get());
+ } else {
+ return false;
+ }
+ }, () -> "Timed out waiting for replica assignments for topic
foo. " +
+ "Wanted: " + expectedMapping + ". Got: " +
currentMapping.get());
+
+ TestUtils.retryOnExceptionWithTimeout(60000, () ->
checkReplicaManager(
+ cluster,
+ Map.of(
+ 0, List.of(true, true, false, true),
+ 1, List.of(true, true, false, true),
+ 2, List.of(true, true, true, true),
+ 3, List.of(false, false, true, true)
+ )
+ ));
+ }
+ }
+ }
+
+ private void checkReplicaManager(KafkaClusterTestKit cluster, Map<Integer,
List<Boolean>> expectedHosting) {
+ for (Map.Entry<Integer, List<Boolean>> entry :
expectedHosting.entrySet()) {
+ int brokerId = entry.getKey();
+ List<Boolean> partitionsIsHosted = entry.getValue();
+ var broker = cluster.brokers().get(brokerId);
+
+ for (int partitionId = 0; partitionId < partitionsIsHosted.size();
partitionId++) {
+ boolean isHosted = partitionsIsHosted.get(partitionId);
+ TopicPartition topicPartition = new TopicPartition("foo",
partitionId);
+ var partition =
broker.replicaManager().getPartition(topicPartition);
+ if (isHosted) {
+
assertNotEquals(kafka.server.HostedPartition.None$.MODULE$, partition,
"topicPartition = " + topicPartition);
+ } else {
+ assertEquals(kafka.server.HostedPartition.None$.MODULE$,
partition, "topicPartition = " + topicPartition);
+ }
+ }
+ }
+ }
+
+ private List<List<Integer>>
translatePartitionInfoToSeq(List<TopicPartitionInfo> partitions) {
+ return partitions.stream()
+ .map(partition -> partition.replicas().stream()
+ .map(Node::id)
+ .collect(Collectors.toList()))
+ .collect(Collectors.toList());
+ }
+
+ @Test
+ public void testIncrementalAlterConfigs() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+
+ try (Admin admin = cluster.admin()) {
+ Map<ConfigResource, Collection<AlterConfigOp>> brokerConfigs =
Map.of(
+ new ConfigResource(Type.BROKER, ""),
+ List.of(
+ new AlterConfigOp(new ConfigEntry("log.roll.ms",
"1234567"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new
ConfigEntry("max.connections.per.ip", "60"), AlterConfigOp.OpType.SET)
+ )
+ );
+ assertEquals(List.of(ApiError.NONE), incrementalAlter(admin,
brokerConfigs));
+
+ validateConfigs(admin, Map.of(
+ new ConfigResource(Type.BROKER, ""), Map.of(
+ "log.roll.ms", "1234567",
+ "max.connections.per.ip", "60",
+ "min.insync.replicas", "1"
+ )), true);
+
+ admin.createTopics(List.of(
+ new NewTopic("foo", 2, (short) 3),
+ new NewTopic("bar", 2, (short) 3)
+ )).all().get();
+ waitForAllPartitions(cluster, "foo", 2);
+ waitForAllPartitions(cluster, "bar", 2);
+
+ validateConfigs(admin, Map.of(
+ new ConfigResource(Type.TOPIC, "bar"), Map.of()
+ ), false);
+
+ assertListEquals(List.of(ApiError.NONE,
+ new ApiError(Errors.INVALID_CONFIG, "Unknown topic config
name: not.a.real.topic.config"),
+ new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The topic
'baz' does not exist.")),
+ incrementalAlter(admin, Map.of(
+ new ConfigResource(Type.TOPIC, "foo"),
+ List.of(new AlterConfigOp(new
ConfigEntry("segment.jitter.ms", "345"), AlterConfigOp.OpType.SET)),
+ new ConfigResource(Type.TOPIC, "bar"),
+ List.of(new AlterConfigOp(new
ConfigEntry("not.a.real.topic.config", "789"), AlterConfigOp.OpType.SET)),
+ new ConfigResource(Type.TOPIC, "baz"),
+ List.of(new AlterConfigOp(new
ConfigEntry("segment.jitter.ms", "678"), AlterConfigOp.OpType.SET))
+ )));
+
+ validateConfigs(admin, Map.of(
+ new ConfigResource(Type.TOPIC, "foo"),
Map.of("segment.jitter.ms", "345")
+ ), false);
+ assertEquals(List.of(ApiError.NONE), incrementalAlter(admin,
Map.of(
+ new ConfigResource(Type.BROKER, "2"),
+ List.of(new AlterConfigOp(new
ConfigEntry("max.connections.per.ip", "7"), AlterConfigOp.OpType.SET))
+ )));
+ validateConfigs(admin, Map.of(
+ new ConfigResource(Type.BROKER, "2"),
Map.of("max.connections.per.ip", "7")
+ ), false);
+ }
+ }
+ }
+
+ private void waitForAllPartitions(KafkaClusterTestKit cluster, String
topic, int expectedNumPartitions)
+ throws InterruptedException {
+ TestUtils.waitForCondition(() ->
cluster.brokers().values().stream().allMatch(broker -> {
+ Optional<Integer> numPartitionsOpt =
broker.metadataCache().numPartitions(topic);
+ if (expectedNumPartitions == 0) {
+ return numPartitionsOpt.isEmpty();
+ } else {
+ return numPartitionsOpt.isPresent() && numPartitionsOpt.get()
== expectedNumPartitions;
+ }
+ }), 60000L, "Topic [" + topic + "] metadata not propagated after 60000
ms");
+ }
+
+ private List<ApiError> incrementalAlter(Admin admin, Map<ConfigResource,
Collection<AlterConfigOp>> changes) {
+ Map<ConfigResource, KafkaFuture<Void>> values =
admin.incrementalAlterConfigs(changes).values();
+ return changes.keySet().stream().map(resource -> {
+ try {
+ values.get(resource).get();
+ return ApiError.NONE;
+ } catch (Throwable t) {
+ return ApiError.fromThrowable(t);
+ }
+ }).collect(Collectors.toList());
+ }
+
+ private Map<ConfigResource, Map<String, String>> validateConfigs(
+ Admin admin,
+ Map<ConfigResource, Map<String, String>> expected,
+ boolean exhaustive
+ ) throws Exception {
+ Map<ConfigResource, Map<String, String>> results = new HashMap<>();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var values = admin.describeConfigs(expected.keySet()).values();
+ results.clear();
+ assertEquals(expected.keySet(), values.keySet());
+ for (Map.Entry<ConfigResource, Map<String, String>> entry :
expected.entrySet()) {
+ ConfigResource resource = entry.getKey();
+ Map<String, String> expectedPairs = entry.getValue();
+ var config = values.get(resource).get();
+ Map<String, String> actualMap = new TreeMap<>();
+ Map<String, String> expectedMap = new TreeMap<>();
+ config.entries().forEach(configEntry -> {
+ actualMap.put(configEntry.name(), configEntry.value());
+ if (!exhaustive) {
+ expectedMap.put(configEntry.name(),
configEntry.value());
+ }
+ });
+ expectedMap.putAll(expectedPairs);
+ assertEquals(expectedMap, actualMap);
+ results.put(resource, actualMap);
+ }
+ });
+ return results;
+ }
+
+ @Test
+ public void testSetLog4jConfigurations() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+
+ try (Admin admin = cluster.admin()) {
+ LOG.debug("setting log4j");
+ LOG_2.debug("setting log4j");
+
+ ConfigResource broker1 = new
ConfigResource(Type.BROKER_LOGGER, "1");
+ ConfigResource broker2 = new
ConfigResource(Type.BROKER_LOGGER, "2");
+ var initialLog4j = validateConfigs(admin, Map.of(broker1,
Map.of()), false);
+
+ assertListEquals(List.of(ApiError.NONE,
+ new ApiError(Errors.INVALID_REQUEST, "APPEND operation is
not allowed for the BROKER_LOGGER resource")),
+ incrementalAlter(admin, Map.of(
+ broker1, List.of(
+ new AlterConfigOp(new ConfigEntry(LOG.getName(),
"TRACE"), OpType.SET),
+ new AlterConfigOp(new ConfigEntry(LOG_2.getName(),
"TRACE"), OpType.SET)),
+ broker2, List.of(
+ new AlterConfigOp(new ConfigEntry(LOG.getName(),
"TRACE"), OpType.APPEND),
+ new AlterConfigOp(new ConfigEntry(LOG_2.getName(),
"TRACE"), OpType.APPEND)))
+ )
+ );
+
+ validateConfigs(admin, Map.of(
+ broker1, Map.of(
+ LOG.getName(), "TRACE",
+ LOG_2.getName(), "TRACE"
+ )
+ ), false);
+
+ assertListEquals(List.of(ApiError.NONE,
+ new ApiError(Errors.INVALID_REQUEST, "SUBTRACT operation
is not allowed for the BROKER_LOGGER resource")),
+ incrementalAlter(admin, Map.of(
+ broker1, List.of(
+ new AlterConfigOp(new ConfigEntry(LOG.getName(),
""), OpType.DELETE),
+ new AlterConfigOp(new ConfigEntry(LOG_2.getName(),
""), OpType.DELETE)),
+ broker2, List.of(
+ new AlterConfigOp(new ConfigEntry(LOG.getName(),
"TRACE"), OpType.SUBTRACT),
+ new AlterConfigOp(new ConfigEntry(LOG_2.getName(),
"TRACE"), OpType.SUBTRACT)))
+ )
+ );
+
+ validateConfigs(admin, Map.of(
+ broker1, Map.of(
+ LOG.getName(),
initialLog4j.get(broker1).get(LOG.getName()),
+ LOG_2.getName(),
initialLog4j.get(broker1).get(LOG_2.getName())
+ )
+ ), false);
+ }
+ }
+ }
+
+ private void assertListEquals(List<ApiError> expected, List<ApiError>
actual) {
+ for (ApiError expectedError : expected) {
+ if (!actual.contains(expectedError)) {
+ fail("Failed to find expected error " + expectedError);
+ }
+ }
+ for (ApiError actualError : actual) {
+ if (!expected.contains(actualError)) {
+ fail("Found unexpected error " + actualError);
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"3.7-IV0", "3.7-IV2"})
+ public void testCreatePartitions(String metadataVersionString) throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+
.setBootstrapMetadataVersion(MetadataVersion.fromVersionString(metadataVersionString,
true))
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+
+ try (Admin admin = cluster.admin()) {
+ Map<String, KafkaFuture<Void>> createResults =
admin.createTopics(List.of(
+ new NewTopic("foo", 1, (short) 3),
+ new NewTopic("bar", 2, (short) 3)
+ )).values();
+ createResults.get("foo").get();
+ createResults.get("bar").get();
+ Map<String, KafkaFuture<Void>> increaseResults =
admin.createPartitions(Map.of(
+ "foo", NewPartitions.increaseTo(3),
+ "bar", NewPartitions.increaseTo(2)
+ )).values();
+
+ increaseResults.get("foo").get();
+ ExecutionException exception =
assertThrows(ExecutionException.class, () -> increaseResults.get("bar").get());
+ assertEquals(InvalidPartitionsException.class,
exception.getCause().getClass());
+ }
+ }
+ }
+
+ @Test
+ public void testDescribeQuorumRequestToBrokers() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+
+ for (int i = 0; i < 3; i++) {
+ int brokerId = i;
+ TestUtils.waitForCondition(
Review Comment:
Can it be replaced by `cluster.waitForReadyBrokers();`?
##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -644,6 +674,537 @@ private Admin createAdminClient(KafkaClusterTestKit
cluster, boolean usingBootst
return cluster.admin(Map.of(AdminClientConfig.CLIENT_ID_CONFIG,
this.getClass().getName()), usingBootstrapControllers);
}
+ @Test
+ public void testCreateClusterAndPerformReassignment() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(4)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+
+ try (Admin admin = cluster.admin()) {
+ // Create the topic.
+ Map<Integer, List<Integer>> assignments = Map.of(
+ 0, List.of(0, 1, 2),
+ 1, List.of(1, 2, 3),
+ 2, List.of(2, 3, 0),
+ 3, List.of(3, 2, 1)
+ );
+
+ CreateTopicsResult createTopicResult =
admin.createTopics(List.of(
+ new NewTopic("foo", assignments)));
+ createTopicResult.all().get();
+ waitForTopicListing(admin, List.of("foo"), List.of());
+
+ // Start some reassignments.
+ assertEquals(Map.of(),
admin.listPartitionReassignments().reassignments().get());
+ Map<TopicPartition, Optional<NewPartitionReassignment>>
reassignments = Map.of(
+ new TopicPartition("foo", 0), Optional.of(new
NewPartitionReassignment(List.of(2, 1, 0))),
+ new TopicPartition("foo", 1), Optional.of(new
NewPartitionReassignment(List.of(0, 1, 2))),
+ new TopicPartition("foo", 2), Optional.of(new
NewPartitionReassignment(List.of(2, 3))),
+ new TopicPartition("foo", 3), Optional.of(new
NewPartitionReassignment(List.of(3, 2, 0, 1)))
+ );
+ admin.alterPartitionReassignments(reassignments).all().get();
+ TestUtils.waitForCondition(
+ () ->
admin.listPartitionReassignments().reassignments().get().isEmpty(),
+ "The reassignment never completed."
+ );
+ AtomicReference<List<List<Integer>>> currentMapping = new
AtomicReference<>(List.of());
+ List<List<Integer>> expectedMapping = List.of(
+ List.of(2, 1, 0),
+ List.of(0, 1, 2),
+ List.of(2, 3),
+ List.of(3, 2, 0, 1)
+ );
+ TestUtils.waitForCondition(() -> {
+ Map<String, TopicDescription> topicInfoMap =
admin.describeTopics(Set.of("foo")).allTopicNames().get();
+ if (topicInfoMap.containsKey("foo")) {
+
currentMapping.set(translatePartitionInfoToSeq(topicInfoMap.get("foo").partitions()));
+ return expectedMapping.equals(currentMapping.get());
+ } else {
+ return false;
+ }
+ }, () -> "Timed out waiting for replica assignments for topic
foo. " +
+ "Wanted: " + expectedMapping + ". Got: " +
currentMapping.get());
+
+ TestUtils.retryOnExceptionWithTimeout(60000, () ->
checkReplicaManager(
+ cluster,
+ Map.of(
+ 0, List.of(true, true, false, true),
+ 1, List.of(true, true, false, true),
+ 2, List.of(true, true, true, true),
+ 3, List.of(false, false, true, true)
+ )
+ ));
+ }
+ }
+ }
+
+ private void checkReplicaManager(KafkaClusterTestKit cluster, Map<Integer,
List<Boolean>> expectedHosting) {
+ for (Map.Entry<Integer, List<Boolean>> entry :
expectedHosting.entrySet()) {
+ int brokerId = entry.getKey();
+ List<Boolean> partitionsIsHosted = entry.getValue();
+ var broker = cluster.brokers().get(brokerId);
+
+ for (int partitionId = 0; partitionId < partitionsIsHosted.size();
partitionId++) {
+ boolean isHosted = partitionsIsHosted.get(partitionId);
+ TopicPartition topicPartition = new TopicPartition("foo",
partitionId);
+ var partition =
broker.replicaManager().getPartition(topicPartition);
+ if (isHosted) {
+
assertNotEquals(kafka.server.HostedPartition.None$.MODULE$, partition,
"topicPartition = " + topicPartition);
+ } else {
+ assertEquals(kafka.server.HostedPartition.None$.MODULE$,
partition, "topicPartition = " + topicPartition);
+ }
+ }
+ }
+ }
+
+ private List<List<Integer>>
translatePartitionInfoToSeq(List<TopicPartitionInfo> partitions) {
+ return partitions.stream()
+ .map(partition -> partition.replicas().stream()
+ .map(Node::id)
+ .collect(Collectors.toList()))
+ .collect(Collectors.toList());
+ }
+
+ @Test
+ public void testIncrementalAlterConfigs() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+
+ try (Admin admin = cluster.admin()) {
+ Map<ConfigResource, Collection<AlterConfigOp>> brokerConfigs =
Map.of(
+ new ConfigResource(Type.BROKER, ""),
+ List.of(
+ new AlterConfigOp(new ConfigEntry("log.roll.ms",
"1234567"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new
ConfigEntry("max.connections.per.ip", "60"), AlterConfigOp.OpType.SET)
+ )
+ );
+ assertEquals(List.of(ApiError.NONE), incrementalAlter(admin,
brokerConfigs));
+
+ validateConfigs(admin, Map.of(
+ new ConfigResource(Type.BROKER, ""), Map.of(
+ "log.roll.ms", "1234567",
+ "max.connections.per.ip", "60",
+ "min.insync.replicas", "1"
+ )), true);
+
+ admin.createTopics(List.of(
+ new NewTopic("foo", 2, (short) 3),
+ new NewTopic("bar", 2, (short) 3)
+ )).all().get();
+ waitForAllPartitions(cluster, "foo", 2);
+ waitForAllPartitions(cluster, "bar", 2);
+
+ validateConfigs(admin, Map.of(
+ new ConfigResource(Type.TOPIC, "bar"), Map.of()
+ ), false);
+
+ assertListEquals(List.of(ApiError.NONE,
+ new ApiError(Errors.INVALID_CONFIG, "Unknown topic config
name: not.a.real.topic.config"),
+ new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The topic
'baz' does not exist.")),
+ incrementalAlter(admin, Map.of(
+ new ConfigResource(Type.TOPIC, "foo"),
+ List.of(new AlterConfigOp(new
ConfigEntry("segment.jitter.ms", "345"), AlterConfigOp.OpType.SET)),
+ new ConfigResource(Type.TOPIC, "bar"),
+ List.of(new AlterConfigOp(new
ConfigEntry("not.a.real.topic.config", "789"), AlterConfigOp.OpType.SET)),
+ new ConfigResource(Type.TOPIC, "baz"),
+ List.of(new AlterConfigOp(new
ConfigEntry("segment.jitter.ms", "678"), AlterConfigOp.OpType.SET))
+ )));
+
+ validateConfigs(admin, Map.of(
+ new ConfigResource(Type.TOPIC, "foo"),
Map.of("segment.jitter.ms", "345")
+ ), false);
+ assertEquals(List.of(ApiError.NONE), incrementalAlter(admin,
Map.of(
+ new ConfigResource(Type.BROKER, "2"),
+ List.of(new AlterConfigOp(new
ConfigEntry("max.connections.per.ip", "7"), AlterConfigOp.OpType.SET))
+ )));
+ validateConfigs(admin, Map.of(
+ new ConfigResource(Type.BROKER, "2"),
Map.of("max.connections.per.ip", "7")
+ ), false);
+ }
+ }
+ }
+
+ private void waitForAllPartitions(KafkaClusterTestKit cluster, String
topic, int expectedNumPartitions)
+ throws InterruptedException {
+ TestUtils.waitForCondition(() ->
cluster.brokers().values().stream().allMatch(broker -> {
+ Optional<Integer> numPartitionsOpt =
broker.metadataCache().numPartitions(topic);
+ if (expectedNumPartitions == 0) {
+ return numPartitionsOpt.isEmpty();
+ } else {
+ return numPartitionsOpt.isPresent() && numPartitionsOpt.get()
== expectedNumPartitions;
+ }
+ }), 60000L, "Topic [" + topic + "] metadata not propagated after 60000
ms");
+ }
+
+ private List<ApiError> incrementalAlter(Admin admin, Map<ConfigResource,
Collection<AlterConfigOp>> changes) {
+ Map<ConfigResource, KafkaFuture<Void>> values =
admin.incrementalAlterConfigs(changes).values();
+ return changes.keySet().stream().map(resource -> {
+ try {
+ values.get(resource).get();
+ return ApiError.NONE;
+ } catch (Throwable t) {
+ return ApiError.fromThrowable(t);
+ }
+ }).collect(Collectors.toList());
+ }
+
+ private Map<ConfigResource, Map<String, String>> validateConfigs(
+ Admin admin,
+ Map<ConfigResource, Map<String, String>> expected,
+ boolean exhaustive
+ ) throws Exception {
+ Map<ConfigResource, Map<String, String>> results = new HashMap<>();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var values = admin.describeConfigs(expected.keySet()).values();
+ results.clear();
+ assertEquals(expected.keySet(), values.keySet());
+ for (Map.Entry<ConfigResource, Map<String, String>> entry :
expected.entrySet()) {
+ ConfigResource resource = entry.getKey();
+ Map<String, String> expectedPairs = entry.getValue();
+ var config = values.get(resource).get();
+ Map<String, String> actualMap = new TreeMap<>();
+ Map<String, String> expectedMap = new TreeMap<>();
+ config.entries().forEach(configEntry -> {
+ actualMap.put(configEntry.name(), configEntry.value());
+ if (!exhaustive) {
+ expectedMap.put(configEntry.name(),
configEntry.value());
+ }
+ });
+ expectedMap.putAll(expectedPairs);
+ assertEquals(expectedMap, actualMap);
+ results.put(resource, actualMap);
+ }
+ });
+ return results;
+ }
+
+ @Test
+ public void testSetLog4jConfigurations() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+
+ try (Admin admin = cluster.admin()) {
+ LOG.debug("setting log4j");
+ LOG_2.debug("setting log4j");
+
+ ConfigResource broker1 = new
ConfigResource(Type.BROKER_LOGGER, "1");
+ ConfigResource broker2 = new
ConfigResource(Type.BROKER_LOGGER, "2");
+ var initialLog4j = validateConfigs(admin, Map.of(broker1,
Map.of()), false);
+
+ assertListEquals(List.of(ApiError.NONE,
+ new ApiError(Errors.INVALID_REQUEST, "APPEND operation is
not allowed for the BROKER_LOGGER resource")),
+ incrementalAlter(admin, Map.of(
+ broker1, List.of(
+ new AlterConfigOp(new ConfigEntry(LOG.getName(),
"TRACE"), OpType.SET),
+ new AlterConfigOp(new ConfigEntry(LOG_2.getName(),
"TRACE"), OpType.SET)),
+ broker2, List.of(
+ new AlterConfigOp(new ConfigEntry(LOG.getName(),
"TRACE"), OpType.APPEND),
+ new AlterConfigOp(new ConfigEntry(LOG_2.getName(),
"TRACE"), OpType.APPEND)))
+ )
+ );
+
+ validateConfigs(admin, Map.of(
+ broker1, Map.of(
+ LOG.getName(), "TRACE",
+ LOG_2.getName(), "TRACE"
+ )
+ ), false);
+
+ assertListEquals(List.of(ApiError.NONE,
+ new ApiError(Errors.INVALID_REQUEST, "SUBTRACT operation
is not allowed for the BROKER_LOGGER resource")),
+ incrementalAlter(admin, Map.of(
+ broker1, List.of(
+ new AlterConfigOp(new ConfigEntry(LOG.getName(),
""), OpType.DELETE),
+ new AlterConfigOp(new ConfigEntry(LOG_2.getName(),
""), OpType.DELETE)),
+ broker2, List.of(
+ new AlterConfigOp(new ConfigEntry(LOG.getName(),
"TRACE"), OpType.SUBTRACT),
+ new AlterConfigOp(new ConfigEntry(LOG_2.getName(),
"TRACE"), OpType.SUBTRACT)))
+ )
+ );
+
+ validateConfigs(admin, Map.of(
+ broker1, Map.of(
+ LOG.getName(),
initialLog4j.get(broker1).get(LOG.getName()),
+ LOG_2.getName(),
initialLog4j.get(broker1).get(LOG_2.getName())
+ )
+ ), false);
+ }
+ }
+ }
+
+ private void assertListEquals(List<ApiError> expected, List<ApiError>
actual) {
+ for (ApiError expectedError : expected) {
+ if (!actual.contains(expectedError)) {
+ fail("Failed to find expected error " + expectedError);
+ }
+ }
+ for (ApiError actualError : actual) {
+ if (!expected.contains(actualError)) {
+ fail("Found unexpected error " + actualError);
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"3.7-IV0", "3.7-IV2"})
+ public void testCreatePartitions(String metadataVersionString) throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+
.setBootstrapMetadataVersion(MetadataVersion.fromVersionString(metadataVersionString,
true))
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+
+ try (Admin admin = cluster.admin()) {
+ Map<String, KafkaFuture<Void>> createResults =
admin.createTopics(List.of(
+ new NewTopic("foo", 1, (short) 3),
+ new NewTopic("bar", 2, (short) 3)
+ )).values();
+ createResults.get("foo").get();
+ createResults.get("bar").get();
+ Map<String, KafkaFuture<Void>> increaseResults =
admin.createPartitions(Map.of(
+ "foo", NewPartitions.increaseTo(3),
+ "bar", NewPartitions.increaseTo(2)
+ )).values();
+
+ increaseResults.get("foo").get();
+ ExecutionException exception =
assertThrows(ExecutionException.class, () -> increaseResults.get("bar").get());
+ assertEquals(InvalidPartitionsException.class,
exception.getCause().getClass());
+ }
+ }
+ }
+
+ @Test
+ public void testDescribeQuorumRequestToBrokers() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+
+ for (int i = 0; i < 3; i++) {
+ int brokerId = i;
+ TestUtils.waitForCondition(
+ () -> cluster.brokers().get(brokerId).brokerState() ==
BrokerState.RUNNING,
+ "Broker Never started up"
+ );
+ }
+ try (Admin admin = createAdminClient(cluster, false)) {
+ QuorumInfo quorumInfo = admin.describeMetadataQuorum(new
DescribeMetadataQuorumOptions()).quorumInfo().get();
+
+ Set<Integer> controllerIds = cluster.controllers().keySet();
+ Set<Integer> voterIds = quorumInfo.voters().stream()
+ .map(QuorumInfo.ReplicaState::replicaId)
+ .collect(Collectors.toSet());
+ assertEquals(controllerIds, voterIds);
+ assertTrue(controllerIds.contains(quorumInfo.leaderId()),
+ "Leader ID " + quorumInfo.leaderId() + " was not a
controller ID.");
+
+ AtomicReference<List<QuorumInfo.ReplicaState>>
currentVotersRef = new AtomicReference<>();
+ TestUtils.waitForCondition(() -> {
+ try {
+ List<QuorumInfo.ReplicaState> voters =
admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions())
+ .quorumInfo().get().voters();
+ currentVotersRef.set(voters);
+ return voters.stream().allMatch(voter ->
+ voter.logEndOffset() > 0
+ && voter.lastFetchTimestamp().isPresent()
+ && voter.lastCaughtUpTimestamp().isPresent()
+ );
+ } catch (Exception e) {
+ return false;
+ }
+ }, () -> "At least one voter did not return the expected state
within timeout. " +
+ "The responses gathered for all the voters: " +
currentVotersRef.get());
+
+ AtomicReference<List<QuorumInfo.ReplicaState>>
currentObserversRef = new AtomicReference<>();
+ TestUtils.waitForCondition(() -> {
+ try {
+ List<QuorumInfo.ReplicaState> observers =
admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions())
+ .quorumInfo().get().observers();
+ currentObserversRef.set(observers);
+ Set<Integer> brokerIds = cluster.brokers().keySet();
+ Set<Integer> observerIds = observers.stream()
+ .map(QuorumInfo.ReplicaState::replicaId)
+ .collect(Collectors.toSet());
+
+ boolean idsMatch = brokerIds.equals(observerIds);
+ boolean stateValid =
observers.stream().allMatch(observer ->
+ observer.logEndOffset() > 0
+ && observer.lastFetchTimestamp().isPresent()
+ && observer.lastCaughtUpTimestamp().isPresent()
+ );
+ return idsMatch && stateValid;
+ } catch (Exception e) {
+ return false;
+ }
+ }, () -> "At least one observer did not return the expected
state within timeout. " +
+ "The responses gathered for all the observers: " +
currentObserversRef.get());
+ }
+ }
+ }
+
+ @Test
+ public void testDescribeQuorumRequestToControllers() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+
+ for (int i = 0; i < 3; i++) {
+ int brokerId = i;
+ TestUtils.waitForCondition(
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]