chia7712 commented on code in PR #21175:
URL: https://github.com/apache/kafka/pull/21175#discussion_r2640453233


##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -644,6 +674,539 @@ private Admin createAdminClient(KafkaClusterTestKit 
cluster, boolean usingBootst
         return cluster.admin(Map.of(AdminClientConfig.CLIENT_ID_CONFIG, 
this.getClass().getName()), usingBootstrapControllers);
     }
 
+    @Test
+    public void testCreateClusterAndPerformReassignment() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(4)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+
+            try (Admin admin = cluster.admin()) {
+                // Create the topic.
+                Map<Integer, List<Integer>> assignments = Map.of(
+                    0, List.of(0, 1, 2),
+                    1, List.of(1, 2, 3),
+                    2, List.of(2, 3, 0),
+                    3, List.of(3, 2, 1)
+                );
+
+                CreateTopicsResult createTopicResult = 
admin.createTopics(List.of(
+                    new NewTopic("foo", assignments)));
+                createTopicResult.all().get();
+                waitForTopicListing(admin, List.of("foo"), List.of());
+
+                // Start some reassignments.
+                assertEquals(Map.of(), 
admin.listPartitionReassignments().reassignments().get());
+                Map<TopicPartition, Optional<NewPartitionReassignment>> 
reassignments = Map.of(
+                    new TopicPartition("foo", 0), Optional.of(new 
NewPartitionReassignment(List.of(2, 1, 0))),
+                    new TopicPartition("foo", 1), Optional.of(new 
NewPartitionReassignment(List.of(0, 1, 2))),
+                    new TopicPartition("foo", 2), Optional.of(new 
NewPartitionReassignment(List.of(2, 3))),
+                    new TopicPartition("foo", 3), Optional.of(new 
NewPartitionReassignment(List.of(3, 2, 0, 1)))
+                );
+                admin.alterPartitionReassignments(reassignments).all().get();
+                TestUtils.waitForCondition(
+                    () -> 
admin.listPartitionReassignments().reassignments().get().isEmpty(),
+                    "The reassignment never completed."
+                );
+                AtomicReference<List<List<Integer>>> currentMapping = new 
AtomicReference<>(List.of());
+                List<List<Integer>> expectedMapping = List.of(
+                    List.of(2, 1, 0),
+                    List.of(0, 1, 2),
+                    List.of(2, 3),
+                    List.of(3, 2, 0, 1)
+                );
+                TestUtils.waitForCondition(() -> {
+                    Map<String, TopicDescription> topicInfoMap = 
admin.describeTopics(Set.of("foo")).allTopicNames().get();
+                    if (topicInfoMap.containsKey("foo")) {
+                        
currentMapping.set(translatePartitionInfoToSeq(topicInfoMap.get("foo").partitions()));
+                        return expectedMapping.equals(currentMapping.get());
+                    } else {
+                        return false;
+                    }
+                }, () -> "Timed out waiting for replica assignments for topic 
foo. " +
+                    "Wanted: " + expectedMapping + ". Got: " + 
currentMapping.get());
+
+                TestUtils.retryOnExceptionWithTimeout(60000, () -> 
checkReplicaManager(
+                    cluster,
+                    Map.of(
+                        0, List.of(true, true, false, true),
+                        1, List.of(true, true, false, true),
+                        2, List.of(true, true, true, true),
+                        3, List.of(false, false, true, true)
+                    )
+                ));
+            }
+        }
+    }
+
+    private void checkReplicaManager(KafkaClusterTestKit cluster, Map<Integer, 
List<Boolean>> expectedHosting) {
+        for (Map.Entry<Integer, List<Boolean>> entry : 
expectedHosting.entrySet()) {
+            int brokerId = entry.getKey();
+            List<Boolean> partitionsIsHosted = entry.getValue();
+            var broker = cluster.brokers().get(brokerId);
+
+            for (int partitionId = 0; partitionId < partitionsIsHosted.size(); 
partitionId++) {
+                boolean isHosted = partitionsIsHosted.get(partitionId);
+                TopicPartition topicPartition = new TopicPartition("foo", 
partitionId);
+                var partition = 
broker.replicaManager().getPartition(topicPartition);
+                if (isHosted) {
+                    
assertNotEquals(kafka.server.HostedPartition.None$.MODULE$, partition, 
"topicPartition = " + topicPartition);
+                } else {
+                    assertEquals(kafka.server.HostedPartition.None$.MODULE$, 
partition, "topicPartition = " + topicPartition);
+                }
+            }
+        }
+    }
+
+    private List<List<Integer>> 
translatePartitionInfoToSeq(List<TopicPartitionInfo> partitions) {
+        return partitions.stream()
+            .map(partition -> partition.replicas().stream()
+                .map(Node::id)
+                .collect(Collectors.toList()))
+            .collect(Collectors.toList());
+    }
+
+    @Test
+    public void testIncrementalAlterConfigs() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(3)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+
+            try (Admin admin = cluster.admin()) {
+                Map<ConfigResource, Collection<AlterConfigOp>> brokerConfigs = 
Map.of(
+                    new ConfigResource(Type.BROKER, ""),
+                    List.of(
+                        new AlterConfigOp(new ConfigEntry("log.roll.ms", 
"1234567"), AlterConfigOp.OpType.SET),
+                        new AlterConfigOp(new 
ConfigEntry("max.connections.per.ip", "60"), AlterConfigOp.OpType.SET)
+                    )
+                );
+                assertEquals(List.of(ApiError.NONE), incrementalAlter(admin, 
brokerConfigs));
+
+                validateConfigs(admin, Map.of(
+                    new ConfigResource(Type.BROKER, ""), Map.of(
+                        "log.roll.ms", "1234567",
+                        "max.connections.per.ip", "60",
+                        "min.insync.replicas", "1"
+                    )), true);
+
+                admin.createTopics(List.of(
+                    new NewTopic("foo", 2, (short) 3),
+                    new NewTopic("bar", 2, (short) 3)
+                )).all().get();
+                waitForAllPartitions(cluster, "foo", 2);
+                waitForAllPartitions(cluster, "bar", 2);
+
+                validateConfigs(admin, Map.of(
+                    new ConfigResource(Type.TOPIC, "bar"), Map.of()
+                ), false);
+
+                assertListEquals(List.of(ApiError.NONE,
+                    new ApiError(Errors.INVALID_CONFIG, "Unknown topic config 
name: not.a.real.topic.config"),
+                    new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The topic 
'baz' does not exist.")),
+                    incrementalAlter(admin, Map.of(
+                        new ConfigResource(Type.TOPIC, "foo"),
+                        List.of(new AlterConfigOp(new 
ConfigEntry("segment.jitter.ms", "345"), AlterConfigOp.OpType.SET)),
+                        new ConfigResource(Type.TOPIC, "bar"),
+                        List.of(new AlterConfigOp(new 
ConfigEntry("not.a.real.topic.config", "789"), AlterConfigOp.OpType.SET)),
+                        new ConfigResource(Type.TOPIC, "baz"),
+                        List.of(new AlterConfigOp(new 
ConfigEntry("segment.jitter.ms", "678"), AlterConfigOp.OpType.SET))
+                    )));
+
+                validateConfigs(admin, Map.of(
+                    new ConfigResource(Type.TOPIC, "foo"), 
Map.of("segment.jitter.ms", "345")
+                ), false);
+                assertEquals(List.of(ApiError.NONE), incrementalAlter(admin, 
Map.of(
+                    new ConfigResource(Type.BROKER, "2"),
+                    List.of(new AlterConfigOp(new 
ConfigEntry("max.connections.per.ip", "7"), AlterConfigOp.OpType.SET))
+                )));
+                validateConfigs(admin, Map.of(
+                    new ConfigResource(Type.BROKER, "2"), 
Map.of("max.connections.per.ip", "7")
+                ), false);
+            }
+        }
+    }
+
+    private void waitForAllPartitions(KafkaClusterTestKit cluster, String 
topic, int expectedNumPartitions)
+        throws InterruptedException {
+        TestUtils.waitForCondition(() -> 
cluster.brokers().values().stream().allMatch(broker -> {
+            Optional<Integer> numPartitionsOpt = 
broker.metadataCache().numPartitions(topic);
+            if (expectedNumPartitions == 0) {
+                return numPartitionsOpt.isEmpty();
+            } else {
+                return numPartitionsOpt.isPresent() && numPartitionsOpt.get() 
== expectedNumPartitions;
+            }
+        }), 60000L, "Topic [" + topic + "] metadata not propagated after 60000 
ms");
+    }
+
+    private List<ApiError> incrementalAlter(Admin admin, Map<ConfigResource, 
Collection<AlterConfigOp>> changes) {
+        Map<ConfigResource, KafkaFuture<Void>> values = 
admin.incrementalAlterConfigs(changes).values();
+        return changes.keySet().stream().map(resource -> {
+            try {

Review Comment:
   ```java
               try {
                   values.get(resource).get();
                   return ApiError.NONE;
               } catch (Throwable t) {
                   return ApiError.fromThrowable(t);
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to