DL1231 commented on code in PR #21175:
URL: https://github.com/apache/kafka/pull/21175#discussion_r2638868939


##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -644,6 +674,544 @@ private Admin createAdminClient(KafkaClusterTestKit 
cluster, boolean usingBootst
         return cluster.admin(Map.of(AdminClientConfig.CLIENT_ID_CONFIG, 
this.getClass().getName()), usingBootstrapControllers);
     }
 
+    @Test
+    public void testCreateClusterAndPerformReassignment() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(4)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+
+            try (Admin admin = cluster.admin()) {
+                // Create the topic.
+                Map<Integer, List<Integer>> assignments = Map.of(
+                    0, List.of(0, 1, 2),
+                    1, List.of(1, 2, 3),
+                    2, List.of(2, 3, 0),
+                    3, List.of(3, 2, 1)
+                );
+
+                CreateTopicsResult createTopicResult = 
admin.createTopics(List.of(
+                    new NewTopic("foo", assignments)));
+                createTopicResult.all().get();
+                waitForTopicListing(admin, List.of("foo"), List.of());
+
+                // Start some reassignments.
+                assertEquals(Map.of(), 
admin.listPartitionReassignments().reassignments().get());
+                Map<TopicPartition, Optional<NewPartitionReassignment>> 
reassignments = Map.of(
+                    new TopicPartition("foo", 0), Optional.of(new 
NewPartitionReassignment(List.of(2, 1, 0))),
+                    new TopicPartition("foo", 1), Optional.of(new 
NewPartitionReassignment(List.of(0, 1, 2))),
+                    new TopicPartition("foo", 2), Optional.of(new 
NewPartitionReassignment(List.of(2, 3))),
+                    new TopicPartition("foo", 3), Optional.of(new 
NewPartitionReassignment(List.of(3, 2, 0, 1)))
+                );
+                admin.alterPartitionReassignments(reassignments).all().get();
+                TestUtils.waitForCondition(
+                    () -> 
admin.listPartitionReassignments().reassignments().get().isEmpty(),
+                    "The reassignment never completed."
+                );
+                AtomicReference<List<List<Integer>>> currentMapping = new 
AtomicReference<>(List.of());
+                List<List<Integer>> expectedMapping = List.of(
+                    List.of(2, 1, 0),
+                    List.of(0, 1, 2),
+                    List.of(2, 3),
+                    List.of(3, 2, 0, 1)
+                );
+                TestUtils.waitForCondition(() -> {
+                    Map<String, TopicDescription> topicInfoMap = 
admin.describeTopics(Set.of("foo")).allTopicNames().get();
+                    if (topicInfoMap.containsKey("foo")) {
+                        
currentMapping.set(translatePartitionInfoToSeq(topicInfoMap.get("foo").partitions()));
+                        return expectedMapping.equals(currentMapping.get());
+                    } else {
+                        return false;
+                    }
+                }, () -> "Timed out waiting for replica assignments for topic 
foo. " +
+                    "Wanted: " + expectedMapping + ". Got: " + 
currentMapping.get());
+
+                TestUtils.retryOnExceptionWithTimeout(60000, () -> 
checkReplicaManager(
+                    cluster,
+                    Map.of(
+                        0, List.of(true, true, false, true),
+                        1, List.of(true, true, false, true),
+                        2, List.of(true, true, true, true),
+                        3, List.of(false, false, true, true)
+                    )
+                ));
+            }
+        }
+    }
+
+    private void checkReplicaManager(KafkaClusterTestKit cluster, Map<Integer, 
List<Boolean>> expectedHosting) {
+        for (Map.Entry<Integer, List<Boolean>> entry : 
expectedHosting.entrySet()) {
+            int brokerId = entry.getKey();
+            List<Boolean> partitionsIsHosted = entry.getValue();
+            var broker = cluster.brokers().get(brokerId);
+
+            for (int partitionId = 0; partitionId < partitionsIsHosted.size(); 
partitionId++) {
+                boolean isHosted = partitionsIsHosted.get(partitionId);
+                TopicPartition topicPartition = new TopicPartition("foo", 
partitionId);
+                var partition = 
broker.replicaManager().getPartition(topicPartition);
+                if (isHosted) {
+                    
assertNotEquals(kafka.server.HostedPartition.None$.MODULE$, partition, 
"topicPartition = " + topicPartition);
+                } else {
+                    assertEquals(kafka.server.HostedPartition.None$.MODULE$, 
partition, "topicPartition = " + topicPartition);
+                }
+            }
+        }
+    }
+
+    private List<List<Integer>> 
translatePartitionInfoToSeq(List<TopicPartitionInfo> partitions) {
+        return partitions.stream()
+            .map(partition -> partition.replicas().stream()
+                .map(Node::id)
+                .collect(Collectors.toList()))
+            .collect(Collectors.toList());
+    }
+
+    @Test
+    public void testIncrementalAlterConfigs() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(3)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+
+            try (Admin admin = cluster.admin()) {
+                Map<ConfigResource, Collection<AlterConfigOp>> brokerConfigs = 
Map.of(
+                    new ConfigResource(Type.BROKER, ""),
+                    List.of(
+                        new AlterConfigOp(new ConfigEntry("log.roll.ms", 
"1234567"), AlterConfigOp.OpType.SET),
+                        new AlterConfigOp(new 
ConfigEntry("max.connections.per.ip", "60"), AlterConfigOp.OpType.SET)
+                    )
+                );
+                assertEquals(List.of(ApiError.NONE), incrementalAlter(admin, 
brokerConfigs));
+
+                validateConfigs(admin, Map.of(
+                    new ConfigResource(Type.BROKER, ""), Map.of(
+                        "log.roll.ms", "1234567",
+                        "max.connections.per.ip", "60",
+                        "min.insync.replicas", "1"
+                    )), true);
+
+                admin.createTopics(List.of(
+                    new NewTopic("foo", 2, (short) 3),
+                    new NewTopic("bar", 2, (short) 3)
+                )).all().get();
+                waitForAllPartitions(cluster, "foo", 2);
+                waitForAllPartitions(cluster, "bar", 2);
+
+                validateConfigs(admin, Map.of(
+                    new ConfigResource(Type.TOPIC, "bar"), Map.of()
+                ), false);
+
+                assertListEquals(List.of(ApiError.NONE,
+                    new ApiError(Errors.INVALID_CONFIG, "Unknown topic config 
name: not.a.real.topic.config"),
+                    new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The topic 
'baz' does not exist.")),
+                    incrementalAlter(admin, Map.of(
+                        new ConfigResource(Type.TOPIC, "foo"),
+                        List.of(new AlterConfigOp(new 
ConfigEntry("segment.jitter.ms", "345"), AlterConfigOp.OpType.SET)),
+                        new ConfigResource(Type.TOPIC, "bar"),
+                        List.of(new AlterConfigOp(new 
ConfigEntry("not.a.real.topic.config", "789"), AlterConfigOp.OpType.SET)),
+                        new ConfigResource(Type.TOPIC, "baz"),
+                        List.of(new AlterConfigOp(new 
ConfigEntry("segment.jitter.ms", "678"), AlterConfigOp.OpType.SET))
+                    )));
+
+                validateConfigs(admin, Map.of(
+                    new ConfigResource(Type.TOPIC, "foo"), 
Map.of("segment.jitter.ms", "345")
+                ), false);
+                assertEquals(List.of(ApiError.NONE), incrementalAlter(admin, 
Map.of(
+                    new ConfigResource(Type.BROKER, "2"),
+                    List.of(new AlterConfigOp(new 
ConfigEntry("max.connections.per.ip", "7"), AlterConfigOp.OpType.SET))
+                )));
+                validateConfigs(admin, Map.of(
+                    new ConfigResource(Type.BROKER, "2"), 
Map.of("max.connections.per.ip", "7")
+                ), false);
+            }
+        }
+    }
+
+    private void waitForAllPartitions(KafkaClusterTestKit cluster, String 
topic, int expectedNumPartitions)
+        throws InterruptedException {
+        TestUtils.waitForCondition(() -> 
cluster.brokers().values().stream().allMatch(broker -> {
+            Optional<Integer> numPartitionsOpt = 
broker.metadataCache().numPartitions(topic);
+            if (expectedNumPartitions == 0) {
+                return numPartitionsOpt.isEmpty();
+            } else {
+                return numPartitionsOpt.isPresent() && numPartitionsOpt.get() 
== expectedNumPartitions;
+            }
+        }), 60000L, "Topic [" + topic + "] metadata not propagated after 60000 
ms");
+    }
+
+    private List<ApiError> incrementalAlter(Admin admin, Map<ConfigResource, 
Collection<AlterConfigOp>> changes) {
+        Map<ConfigResource, KafkaFuture<Void>> values = 
admin.incrementalAlterConfigs(changes).values();
+        return changes.keySet().stream().map(resource -> {
+            try {
+                values.get(resource).get();
+                return ApiError.NONE;
+            } catch (ExecutionException e) {
+                return ApiError.fromThrowable(e.getCause());
+            } catch (Throwable t) {
+                return ApiError.fromThrowable(t);
+            }
+        }).collect(Collectors.toList());
+    }
+
+    private Map<ConfigResource, Map<String, String>> validateConfigs(
+        Admin admin,
+        Map<ConfigResource, Map<String, String>> expected,
+        boolean exhaustive
+    ) throws Exception {
+        Map<ConfigResource, Map<String, String>> results = new HashMap<>();
+        TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+            try {
+                var values = admin.describeConfigs(expected.keySet()).values();
+                results.clear();
+                assertEquals(expected.keySet(), values.keySet());
+                for (Map.Entry<ConfigResource, Map<String, String>> entry : 
expected.entrySet()) {
+                    ConfigResource resource = entry.getKey();
+                    Map<String, String> expectedPairs = entry.getValue();
+                    var config = values.get(resource).get();
+                    Map<String, String> actualMap = new TreeMap<>();
+                    Map<String, String> expectedMap = new TreeMap<>();
+                    config.entries().forEach(configEntry -> {
+                        actualMap.put(configEntry.name(), configEntry.value());
+                        if (!exhaustive) {
+                            expectedMap.put(configEntry.name(), 
configEntry.value());
+                        }
+                    });
+                    expectedMap.putAll(expectedPairs);
+                    assertEquals(expectedMap, actualMap);
+                    results.put(resource, actualMap);
+                }
+            } catch (Exception t) {
+                LOG.warn("Unable to describeConfigs({})", expected.keySet(), 
t);
+                throw t;
+            }
+        });
+        return results;
+    }
+
+    @Test
+    public void testSetLog4jConfigurations() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(3)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+
+            try (Admin admin = cluster.admin()) {
+                LOG.debug("setting log4j");
+                LOG_2.debug("setting log4j");
+
+                ConfigResource broker1 = new 
ConfigResource(Type.BROKER_LOGGER, "1");
+                ConfigResource broker2 = new 
ConfigResource(Type.BROKER_LOGGER, "2");
+                var initialLog4j = validateConfigs(admin, Map.of(broker1, 
Map.of()), false);
+
+                assertListEquals(List.of(ApiError.NONE,
+                    new ApiError(Errors.INVALID_REQUEST, "APPEND operation is 
not allowed for the BROKER_LOGGER resource")),
+                    incrementalAlter(admin, Map.of(
+                        broker1, List.of(
+                            new AlterConfigOp(new ConfigEntry(LOG.getName(), 
"TRACE"), OpType.SET),
+                            new AlterConfigOp(new ConfigEntry(LOG_2.getName(), 
"TRACE"), OpType.SET)),
+                        broker2, List.of(
+                            new AlterConfigOp(new ConfigEntry(LOG.getName(), 
"TRACE"), OpType.APPEND),
+                            new AlterConfigOp(new ConfigEntry(LOG_2.getName(), 
"TRACE"), OpType.APPEND)))
+                    )
+                );
+
+                validateConfigs(admin, Map.of(
+                    broker1, Map.of(
+                        LOG.getName(), "TRACE",
+                        LOG_2.getName(), "TRACE"
+                    )
+                ), false);
+
+                assertListEquals(List.of(ApiError.NONE,
+                    new ApiError(Errors.INVALID_REQUEST, "SUBTRACT operation 
is not allowed for the BROKER_LOGGER resource")),
+                    incrementalAlter(admin, Map.of(
+                        broker1, List.of(
+                            new AlterConfigOp(new ConfigEntry(LOG.getName(), 
""), OpType.DELETE),
+                            new AlterConfigOp(new ConfigEntry(LOG_2.getName(), 
""), OpType.DELETE)),
+                        broker2, List.of(
+                            new AlterConfigOp(new ConfigEntry(LOG.getName(), 
"TRACE"), OpType.SUBTRACT),
+                            new AlterConfigOp(new ConfigEntry(LOG_2.getName(), 
"TRACE"), OpType.SUBTRACT)))
+                    )
+                );
+
+                validateConfigs(admin, Map.of(
+                    broker1, Map.of(
+                        LOG.getName(), 
initialLog4j.get(broker1).get(LOG.getName()),
+                        LOG_2.getName(), 
initialLog4j.get(broker1).get(LOG_2.getName())
+                    )
+                ), false);
+            }
+        }
+    }
+
+    private void assertListEquals(List<ApiError> expected, List<ApiError> 
actual) {

Review Comment:
   The reason I've opted for the custom `assertListEquals` method is that the 
`incrementalAlterConfigs` API does not guarantee the order of the `ApiError` 
objects in its returned list.
   
   As you know, `assertIterableEquals` performs a strict comparison that 
requires the elements in both iterables to be in the exact same sequence.
   
   The current `assertListEquals` implementation correctly verifies that the 
actual list contains all the expected errors and no unexpected ones, but it 
does so without being sensitive to their order.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to