chia7712 commented on code in PR #15715:
URL: https://github.com/apache/kafka/pull/15715#discussion_r1590605162


##########
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##########
@@ -143,11 +143,12 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
         Type type = annot.clusterType() == Type.DEFAULT ? 
defaults.clusterType() : annot.clusterType();
 
         Map<String, String> serverProperties = new HashMap<>();
+        Map<Integer, Map<String, String>> perServerProperties = new 
HashMap<>();

Review Comment:
   Maybe we can rewrite them by lambda. 
   
   ```java
           Map<String, String> serverProperties = 
Stream.concat(Arrays.stream(defaults.serverProperties()), 
Arrays.stream(annot.serverProperties()))
                   .filter(e -> e.id() == -1)
                   .collect(Collectors.toMap(ClusterConfigProperty::key, 
ClusterConfigProperty::value, (a, b) -> b));
   
           Map<Integer, Map<String, String>> perServerProperties = 
Stream.concat(Arrays.stream(defaults.serverProperties()), 
Arrays.stream(annot.serverProperties()))
                   .filter(e -> e.id() != -1)
                   .collect(Collectors.groupingBy(ClusterConfigProperty::id, 
Collectors.mapping(Function.identity(),
                           Collectors.toMap(ClusterConfigProperty::key, 
ClusterConfigProperty::value, (a, b) -> b))));
   ```



##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -105,17 +109,27 @@ public TestKitNodes build() {
             List<Integer> controllerNodeIds = 
IntStream.range(startControllerId(), startControllerId() + numControllerNodes)
                 .boxed()
                 .collect(Collectors.toList());
-            List<Integer> brokerNodeIds = IntStream.range(startBrokerId(), 
startBrokerId() + numBrokerNodes)
+            List<Integer> brokerNodeIds = IntStream.range(BROKER_ID_OFFSET, 
BROKER_ID_OFFSET + numBrokerNodes)
                 .boxed()
                 .collect(Collectors.toList());
 
+            Set<Integer> unknownIds = perServerProperties.keySet().stream()

Review Comment:
   We can convert the `Integer` to `String` here. The following error message 
can use `String.join` to simplify code.



##########
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##########
@@ -80,30 +84,55 @@ public void testClusterTemplate() {
     @ClusterTests({
         @ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, 
serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "bar"),
-            @ClusterConfigProperty(key = "spam", value = "eggs")
+            @ClusterConfigProperty(key = "spam", value = "eggs"),
+            @ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), // 
this one will be ignored as there is no broker id is 86400
         }),
         @ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, 
serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "baz"),
             @ClusterConfigProperty(key = "spam", value = "eggz"),
-            @ClusterConfigProperty(key = "default.key", value = 
"overwrite.value")
+            @ClusterConfigProperty(key = "default.key", value = 
"overwrite.value"),
+            @ClusterConfigProperty(id = 0, key = "queued.max.requests", value 
= "200"),
+            @ClusterConfigProperty(id = 3000, key = "queued.max.requests", 
value = "300")
         }),
         @ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, 
serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "baz"),
             @ClusterConfigProperty(key = "spam", value = "eggz"),
-            @ClusterConfigProperty(key = "default.key", value = 
"overwrite.value")
+            @ClusterConfigProperty(key = "default.key", value = 
"overwrite.value"),
+            @ClusterConfigProperty(id = 0, key = "queued.max.requests", value 
= "200")
         })
     })
-    public void testClusterTests() {
-        if 
(clusterInstance.clusterType().equals(ClusterInstance.ClusterType.ZK)) {
+    public void testClusterTests() throws ExecutionException, 
InterruptedException {
+        if (!clusterInstance.isKRaftTest()) {
             Assertions.assertEquals("bar", 
clusterInstance.config().serverProperties().get("foo"));
             Assertions.assertEquals("eggs", 
clusterInstance.config().serverProperties().get("spam"));
             Assertions.assertEquals("default.value", 
clusterInstance.config().serverProperties().get("default.key"));
-        } else if 
(clusterInstance.clusterType().equals(ClusterInstance.ClusterType.RAFT)) {
+
+            try (Admin admin = clusterInstance.createAdminClient()) {
+                ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, "0");
+                Map<ConfigResource, Config> configs = 
admin.describeConfigs(Collections.singletonList(configResource)).all().get();
+                Assertions.assertEquals(1, configs.size());
+                Assertions.assertEquals("100", 
configs.get(configResource).get("queued.max.requests").value());

Review Comment:
   Could you please add comments for those assert?



##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -140,15 +154,11 @@ public TestKitNodes build() {
                 brokerNodes);
         }
 
-        private int startBrokerId() {
-            return 0;
-        }
-
         private int startControllerId() {

Review Comment:
   Could we inline this function? For example: `int controllerId = combined ? 
BROKER_ID_OFFSET : BROKER_ID_OFFSET + CONTROLLER_ID_OFFSET;`



##########
core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java:
##########
@@ -27,6 +27,27 @@
 @Target({ElementType.ANNOTATION_TYPE})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface ClusterConfigProperty {
+    /**
+     * The config applies to the controller/broker with specified id. Default 
is -1, indicating the property applied to
+     * all controller/broker servers. Note that the "controller" here refers 
to the KRaft quorum controller.
+     * The id can vary depending on the different {@link 
kafka.test.annotation.Type}.
+     * <ul>
+     *  <li> Under {@link kafka.test.annotation.Type#ZK}, the broker id starts 
from
+     *  {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0} and increases by 
1
+     *  with each additional broker, and there is no controller server under 
this mode. </li>
+     *  <li> Under {@link kafka.test.annotation.Type#KRAFT}, the broker id 
starts from
+     *  {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0}, the controller 
id
+     *  starts from {@link kafka.testkit.TestKitNodes#CONTROLLER_ID_OFFSET 
3000}
+     *  and increases by 1 with each addition broker/controller.</li>
+     *  <li> Under {@link kafka.test.annotation.Type#CO_KRAFT}, the broker id 
and controller id both start from
+     *  {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0}
+     *  and increases by 1 with each additional broker/controller.</li>
+     * </ul>
+     *
+     * If the id doesn't correspond to any broker/controller server, throw 
RuntimeException

Review Comment:
   It seems to me `IllegalArgumentException` is more suitable since that is 
caused by "illegal argument"



##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -105,17 +109,27 @@ public TestKitNodes build() {
             List<Integer> controllerNodeIds = 
IntStream.range(startControllerId(), startControllerId() + numControllerNodes)
                 .boxed()
                 .collect(Collectors.toList());
-            List<Integer> brokerNodeIds = IntStream.range(startBrokerId(), 
startBrokerId() + numBrokerNodes)
+            List<Integer> brokerNodeIds = IntStream.range(BROKER_ID_OFFSET, 
BROKER_ID_OFFSET + numBrokerNodes)
                 .boxed()
                 .collect(Collectors.toList());
 
+            Set<Integer> unknownIds = perServerProperties.keySet().stream()
+                    .filter(id -> !controllerNodeIds.contains(id))
+                    .filter(id -> !brokerNodeIds.contains(id))
+                    .collect(Collectors.toSet());
+            if (!unknownIds.isEmpty()) {
+                throw new RuntimeException(String.format("Unknown server id %s 
in perServerProperties",

Review Comment:
   Could you please add "existent id" to the error message?



##########
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##########
@@ -80,30 +84,55 @@ public void testClusterTemplate() {
     @ClusterTests({
         @ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, 
serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "bar"),
-            @ClusterConfigProperty(key = "spam", value = "eggs")
+            @ClusterConfigProperty(key = "spam", value = "eggs"),
+            @ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), // 
this one will be ignored as there is no broker id is 86400
         }),
         @ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, 
serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "baz"),
             @ClusterConfigProperty(key = "spam", value = "eggz"),
-            @ClusterConfigProperty(key = "default.key", value = 
"overwrite.value")
+            @ClusterConfigProperty(key = "default.key", value = 
"overwrite.value"),
+            @ClusterConfigProperty(id = 0, key = "queued.max.requests", value 
= "200"),
+            @ClusterConfigProperty(id = 3000, key = "queued.max.requests", 
value = "300")
         }),
         @ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, 
serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "baz"),
             @ClusterConfigProperty(key = "spam", value = "eggz"),
-            @ClusterConfigProperty(key = "default.key", value = 
"overwrite.value")
+            @ClusterConfigProperty(key = "default.key", value = 
"overwrite.value"),
+            @ClusterConfigProperty(id = 0, key = "queued.max.requests", value 
= "200")
         })
     })
-    public void testClusterTests() {
-        if 
(clusterInstance.clusterType().equals(ClusterInstance.ClusterType.ZK)) {
+    public void testClusterTests() throws ExecutionException, 
InterruptedException {
+        if (!clusterInstance.isKRaftTest()) {
             Assertions.assertEquals("bar", 
clusterInstance.config().serverProperties().get("foo"));
             Assertions.assertEquals("eggs", 
clusterInstance.config().serverProperties().get("spam"));
             Assertions.assertEquals("default.value", 
clusterInstance.config().serverProperties().get("default.key"));
-        } else if 
(clusterInstance.clusterType().equals(ClusterInstance.ClusterType.RAFT)) {
+
+            try (Admin admin = clusterInstance.createAdminClient()) {
+                ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, "0");
+                Map<ConfigResource, Config> configs = 
admin.describeConfigs(Collections.singletonList(configResource)).all().get();
+                Assertions.assertEquals(1, configs.size());
+                Assertions.assertEquals("100", 
configs.get(configResource).get("queued.max.requests").value());
+            }
+        } else {
             Assertions.assertEquals("baz", 
clusterInstance.config().serverProperties().get("foo"));
             Assertions.assertEquals("eggz", 
clusterInstance.config().serverProperties().get("spam"));
             Assertions.assertEquals("overwrite.value", 
clusterInstance.config().serverProperties().get("default.key"));
-        } else {
-            Assertions.fail("Unknown cluster type " + 
clusterInstance.clusterType());
+
+            try (Admin admin = clusterInstance.createAdminClient()) {
+                ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, "0");
+                Map<ConfigResource, Config> configs = 
admin.describeConfigs(Collections.singletonList(configResource)).all().get();
+                Assertions.assertEquals(1, configs.size());
+                Assertions.assertEquals("200", 
configs.get(configResource).get("queued.max.requests").value());
+            }
+            if (clusterInstance.config().clusterType().equals(Type.KRAFT)) {

Review Comment:
   `clusterInstance.config().clusterType() == Type.KRAFT`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to