Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-27 Thread via GitHub


chia7712 merged PR #15761:
URL: https://github.com/apache/kafka/pull/15761


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-26 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1581724166


##
core/src/test/java/kafka/test/ClusterConfigTest.java:
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.test.annotation.Type;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+
+public class ClusterConfigTest {
+
+@Test
+public void testClusterConfigBuilder() throws IOException {
+File trustStoreFile = TestUtils.tempFile();
+
+ClusterConfig clusterConfig = ClusterConfig.builder()
+.setType(Type.KRAFT)
+.setBrokers(1)
+.setControllers(1)
+.setName("builder-test")
+.setAutoStart(true)
+.setSecurityProtocol(SecurityProtocol.PLAINTEXT)
+.setListenerName("EXTERNAL")
+.setTrustStoreFile(trustStoreFile)
+.setMetadataVersion(MetadataVersion.IBP_0_8_0)
+.setServerProperties(Collections.singletonMap("broker", 
"broker_value"))
+.setConsumerProperties(Collections.singletonMap("consumer", 
"consumer_value"))
+.setProducerProperties(Collections.singletonMap("producer", 
"producer_value"))
+
.setAdminClientProperties(Collections.singletonMap("admin_client", 
"admin_client_value"))
+
.setSaslClientProperties(Collections.singletonMap("sasl_client", 
"sasl_client_value"))
+
.setSaslServerProperties(Collections.singletonMap("sasl_server", 
"sasl_server_value"))
+.setPerBrokerProperties(Collections.singletonMap(0, 
Collections.singletonMap("broker_0", "broker_0_value")))
+.build();
+
+Assertions.assertEquals(Type.KRAFT, clusterConfig.clusterType());

Review Comment:
   Sign... But we may still overlook add fields in `equals()` and `hashCode`. 
I'll address the test as you provided in this  comment. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-26 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1581723855


##
core/src/test/java/kafka/test/ClusterConfigTest.java:
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.test.annotation.Type;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+
+public class ClusterConfigTest {
+
+@Test
+public void testClusterConfigBuilder() throws IOException {
+File trustStoreFile = TestUtils.tempFile();
+
+ClusterConfig clusterConfig = ClusterConfig.builder()
+.setType(Type.KRAFT)
+.setBrokers(1)
+.setControllers(1)
+.setName("builder-test")
+.setAutoStart(true)
+.setSecurityProtocol(SecurityProtocol.PLAINTEXT)
+.setListenerName("EXTERNAL")
+.setTrustStoreFile(trustStoreFile)
+.setMetadataVersion(MetadataVersion.IBP_0_8_0)
+.setServerProperties(Collections.singletonMap("broker", 
"broker_value"))
+.setConsumerProperties(Collections.singletonMap("consumer", 
"consumer_value"))
+.setProducerProperties(Collections.singletonMap("producer", 
"producer_value"))
+
.setAdminClientProperties(Collections.singletonMap("admin_client", 
"admin_client_value"))
+
.setSaslClientProperties(Collections.singletonMap("sasl_client", 
"sasl_client_value"))
+
.setSaslServerProperties(Collections.singletonMap("sasl_server", 
"sasl_server_value"))
+.setPerBrokerProperties(Collections.singletonMap(0, 
Collections.singletonMap("broker_0", "broker_0_value")))
+.build();
+
+Assertions.assertEquals(Type.KRAFT, clusterConfig.clusterType());

Review Comment:
   Sorry, I missed the important part in this test. I'm going to override 
`equals()` and `hashCode` in ClusterConfig instead of using reflect method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-26 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1581597711


##
core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala:
##
@@ -71,11 +72,16 @@ class KafkaServerKRaftRegistrationTest {
   val readyFuture = 
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
 
   // Enable migration configs and restart brokers
-  
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
-  
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
-  
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
-  
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-  zkCluster.rollingBrokerRestart()
+  val serverProperties = new java.util.HashMap[String, String]()

Review Comment:
   ```java
   val serverProperties = new util.HashMap[String, 
String](zkCluster.config().serverProperties(zkCluster.config().serverProperties()))
   
   ```



##
core/src/test/java/kafka/test/ClusterConfigTest.java:
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.test.annotation.Type;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+
+public class ClusterConfigTest {
+
+@Test
+public void testClusterConfigBuilder() throws IOException {
+File trustStoreFile = TestUtils.tempFile();
+
+ClusterConfig clusterConfig = ClusterConfig.builder()
+.setType(Type.KRAFT)
+.setBrokers(1)
+.setControllers(1)
+.setName("builder-test")
+.setAutoStart(true)
+.setSecurityProtocol(SecurityProtocol.PLAINTEXT)
+.setListenerName("EXTERNAL")
+.setTrustStoreFile(trustStoreFile)
+.setMetadataVersion(MetadataVersion.IBP_0_8_0)
+.setServerProperties(Collections.singletonMap("broker", 
"broker_value"))
+.setConsumerProperties(Collections.singletonMap("consumer", 
"consumer_value"))
+.setProducerProperties(Collections.singletonMap("producer", 
"producer_value"))
+
.setAdminClientProperties(Collections.singletonMap("admin_client", 
"admin_client_value"))
+
.setSaslClientProperties(Collections.singletonMap("sasl_client", 
"sasl_client_value"))
+
.setSaslServerProperties(Collections.singletonMap("sasl_server", 
"sasl_server_value"))
+.setPerBrokerProperties(Collections.singletonMap(0, 
Collections.singletonMap("broker_0", "broker_0_value")))
+.build();
+
+Assertions.assertEquals(Type.KRAFT, clusterConfig.clusterType());

Review Comment:
   Dear friend, this test can't protect us from overlooking the field copy 
since we don't check the field added in the future, right? Maybe we can use 
reflection to get all fields and then check all dynamically.
   ```java
   private static Map fields(ClusterConfig config) {
   return 
Arrays.stream(config.getClass().getDeclaredFields()).collect(Collectors.toMap(Field::getName,
 f -> {
   f.setAccessible(true);
   return Assertions.assertDoesNotThrow(() -> f.get(config));
   }));
   }
   
   @Test
   public void testClusterConfigBuilder() throws IOException {
   File trustStoreFile = TestUtils.tempFile();
   
   ClusterConfig clusterConfig = ClusterConfig.builder()
   .setType(Type.KRAFT)
   .setBrokers(3)
   .setControllers(2)
   .setName("builder-test")
   .setAutoStart(true)
  

Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-25 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1580549838


##
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##
@@ -48,14 +51,18 @@ public class ClusterTestExtensionsTest {
 
 // Static methods can generate cluster configurations
 static void generate1(ClusterGenerator clusterGenerator) {
-
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder().name("Generated 
Test").build());
+Map serverProperties = new HashMap<>();
+serverProperties.put("foo", "bar");
+clusterGenerator.accept(ClusterConfig.defaultBuilder()
+.setName("Generated Test")
+.setServerProperties(serverProperties)
+.build());
 }
 
 // BeforeEach run after class construction, but before cluster 
initialization and test invocation
 @BeforeEach
 public void beforeEach(ClusterConfig config) {

Review Comment:
   filed https://issues.apache.org/jira/browse/KAFKA-16627



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-25 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1580544105


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -139,28 +151,38 @@ public Map nameTags() {
 return tags;
 }
 
-public ClusterConfig copyOf() {
-ClusterConfig copy = new ClusterConfig(type, brokers, controllers, 
name, autoStart, securityProtocol, listenerName, trustStoreFile, 
metadataVersion);
-copy.serverProperties.putAll(serverProperties);
-copy.producerProperties.putAll(producerProperties);
-copy.consumerProperties.putAll(consumerProperties);
-copy.saslServerProperties.putAll(saslServerProperties);
-copy.saslClientProperties.putAll(saslClientProperties);
-perBrokerOverrideProperties.forEach((brokerId, props) -> {
-Properties propsCopy = new Properties();
-propsCopy.putAll(props);
-copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
-});
-return copy;
+public static Builder defaultBuilder() {
+return new Builder()
+.setType(Type.ZK)
+.setBrokers(1)
+.setControllers(1)
+.setAutoStart(true)
+.setSecurityProtocol(SecurityProtocol.PLAINTEXT)
+.setMetadataVersion(MetadataVersion.latestTesting());
 }
 
-public static Builder defaultClusterBuilder() {
-return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, 
MetadataVersion.latestTesting());
+public static Builder builder() {
+return new Builder();
 }
 
-public static Builder clusterBuilder(Type type, int brokers, int 
controllers, boolean autoStart,
- SecurityProtocol securityProtocol, 
MetadataVersion metadataVersion) {
-return new Builder(type, brokers, controllers, autoStart, 
securityProtocol, metadataVersion);
+public static Builder builder(ClusterConfig clusterConfig) {

Review Comment:
   Roger that. Thanks for your patience



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1580542830


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -139,28 +151,38 @@ public Map nameTags() {
 return tags;
 }
 
-public ClusterConfig copyOf() {
-ClusterConfig copy = new ClusterConfig(type, brokers, controllers, 
name, autoStart, securityProtocol, listenerName, trustStoreFile, 
metadataVersion);
-copy.serverProperties.putAll(serverProperties);
-copy.producerProperties.putAll(producerProperties);
-copy.consumerProperties.putAll(consumerProperties);
-copy.saslServerProperties.putAll(saslServerProperties);
-copy.saslClientProperties.putAll(saslClientProperties);
-perBrokerOverrideProperties.forEach((brokerId, props) -> {
-Properties propsCopy = new Properties();
-propsCopy.putAll(props);
-copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
-});
-return copy;
+public static Builder defaultBuilder() {
+return new Builder()
+.setType(Type.ZK)
+.setBrokers(1)
+.setControllers(1)
+.setAutoStart(true)
+.setSecurityProtocol(SecurityProtocol.PLAINTEXT)
+.setMetadataVersion(MetadataVersion.latestTesting());
 }
 
-public static Builder defaultClusterBuilder() {
-return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, 
MetadataVersion.latestTesting());
+public static Builder builder() {
+return new Builder();
 }
 
-public static Builder clusterBuilder(Type type, int brokers, int 
controllers, boolean autoStart,
- SecurityProtocol securityProtocol, 
MetadataVersion metadataVersion) {
-return new Builder(type, brokers, controllers, autoStart, 
securityProtocol, metadataVersion);
+public static Builder builder(ClusterConfig clusterConfig) {

Review Comment:
   > Maybe we should directly remove the helper method here
   
   It seems to me this helper could be useful to the tests which have to update 
a part of configs. Hence, all we need to do is add test to make sure the clone 
works well :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-25 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1580538394


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -139,28 +151,38 @@ public Map nameTags() {
 return tags;
 }
 
-public ClusterConfig copyOf() {
-ClusterConfig copy = new ClusterConfig(type, brokers, controllers, 
name, autoStart, securityProtocol, listenerName, trustStoreFile, 
metadataVersion);
-copy.serverProperties.putAll(serverProperties);
-copy.producerProperties.putAll(producerProperties);
-copy.consumerProperties.putAll(consumerProperties);
-copy.saslServerProperties.putAll(saslServerProperties);
-copy.saslClientProperties.putAll(saslClientProperties);
-perBrokerOverrideProperties.forEach((brokerId, props) -> {
-Properties propsCopy = new Properties();
-propsCopy.putAll(props);
-copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
-});
-return copy;
+public static Builder defaultBuilder() {
+return new Builder()
+.setType(Type.ZK)
+.setBrokers(1)
+.setControllers(1)
+.setAutoStart(true)
+.setSecurityProtocol(SecurityProtocol.PLAINTEXT)
+.setMetadataVersion(MetadataVersion.latestTesting());
 }
 
-public static Builder defaultClusterBuilder() {
-return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, 
MetadataVersion.latestTesting());
+public static Builder builder() {
+return new Builder();
 }
 
-public static Builder clusterBuilder(Type type, int brokers, int 
controllers, boolean autoStart,
- SecurityProtocol securityProtocol, 
MetadataVersion metadataVersion) {
-return new Builder(type, brokers, controllers, autoStart, 
securityProtocol, metadataVersion);
+public static Builder builder(ClusterConfig clusterConfig) {

Review Comment:
   Got it. Maybe we should directly remove the helper method here. WDYT ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1580536170


##
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##
@@ -48,14 +51,18 @@ public class ClusterTestExtensionsTest {
 
 // Static methods can generate cluster configurations
 static void generate1(ClusterGenerator clusterGenerator) {
-
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder().name("Generated 
Test").build());
+Map serverProperties = new HashMap<>();
+serverProperties.put("foo", "bar");
+clusterGenerator.accept(ClusterConfig.defaultBuilder()
+.setName("Generated Test")
+.setServerProperties(serverProperties)
+.build());
 }
 
 // BeforeEach run after class construction, but before cluster 
initialization and test invocation
 @BeforeEach
 public void beforeEach(ClusterConfig config) {

Review Comment:
   Could you please file a ticket for it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1580535934


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -139,28 +151,38 @@ public Map nameTags() {
 return tags;
 }
 
-public ClusterConfig copyOf() {
-ClusterConfig copy = new ClusterConfig(type, brokers, controllers, 
name, autoStart, securityProtocol, listenerName, trustStoreFile, 
metadataVersion);
-copy.serverProperties.putAll(serverProperties);
-copy.producerProperties.putAll(producerProperties);
-copy.consumerProperties.putAll(consumerProperties);
-copy.saslServerProperties.putAll(saslServerProperties);
-copy.saslClientProperties.putAll(saslClientProperties);
-perBrokerOverrideProperties.forEach((brokerId, props) -> {
-Properties propsCopy = new Properties();
-propsCopy.putAll(props);
-copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
-});
-return copy;
+public static Builder defaultBuilder() {
+return new Builder()
+.setType(Type.ZK)
+.setBrokers(1)
+.setControllers(1)
+.setAutoStart(true)
+.setSecurityProtocol(SecurityProtocol.PLAINTEXT)
+.setMetadataVersion(MetadataVersion.latestTesting());
 }
 
-public static Builder defaultClusterBuilder() {
-return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, 
MetadataVersion.latestTesting());
+public static Builder builder() {
+return new Builder();
 }
 
-public static Builder clusterBuilder(Type type, int brokers, int 
controllers, boolean autoStart,
- SecurityProtocol securityProtocol, 
MetadataVersion metadataVersion) {
-return new Builder(type, brokers, controllers, autoStart, 
securityProtocol, metadataVersion);
+public static Builder builder(ClusterConfig clusterConfig) {

Review Comment:
   > Did you mean we shouldn't send ClusterConfig as an argument to builder ? 
If so, I can remove this method (i.e. L168). But that means we have to 
explicitly write all setters if we want to copy a ClusterConfig to a brand new 
one.
   And I'm not sure why this is error-prone. Would be appreciate if you can 
share more 😃 .
   
   that is a good helper method but we could neglect this copy-all when adding 
new field to `ClusterConfig` :(
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-25 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1580532456


##
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##
@@ -48,14 +51,18 @@ public class ClusterTestExtensionsTest {
 
 // Static methods can generate cluster configurations
 static void generate1(ClusterGenerator clusterGenerator) {
-
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder().name("Generated 
Test").build());
+Map serverProperties = new HashMap<>();
+serverProperties.put("foo", "bar");
+clusterGenerator.accept(ClusterConfig.defaultBuilder()
+.setName("Generated Test")
+.setServerProperties(serverProperties)
+.build());
 }
 
 // BeforeEach run after class construction, but before cluster 
initialization and test invocation
 @BeforeEach
 public void beforeEach(ClusterConfig config) {

Review Comment:
   Make sense to me. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-25 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1580532188


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -139,28 +151,38 @@ public Map nameTags() {
 return tags;
 }
 
-public ClusterConfig copyOf() {
-ClusterConfig copy = new ClusterConfig(type, brokers, controllers, 
name, autoStart, securityProtocol, listenerName, trustStoreFile, 
metadataVersion);
-copy.serverProperties.putAll(serverProperties);
-copy.producerProperties.putAll(producerProperties);
-copy.consumerProperties.putAll(consumerProperties);
-copy.saslServerProperties.putAll(saslServerProperties);
-copy.saslClientProperties.putAll(saslClientProperties);
-perBrokerOverrideProperties.forEach((brokerId, props) -> {
-Properties propsCopy = new Properties();
-propsCopy.putAll(props);
-copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
-});
-return copy;
+public static Builder defaultBuilder() {
+return new Builder()
+.setType(Type.ZK)
+.setBrokers(1)
+.setControllers(1)
+.setAutoStart(true)
+.setSecurityProtocol(SecurityProtocol.PLAINTEXT)
+.setMetadataVersion(MetadataVersion.latestTesting());
 }
 
-public static Builder defaultClusterBuilder() {
-return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, 
MetadataVersion.latestTesting());
+public static Builder builder() {
+return new Builder();
 }
 
-public static Builder clusterBuilder(Type type, int brokers, int 
controllers, boolean autoStart,
- SecurityProtocol securityProtocol, 
MetadataVersion metadataVersion) {
-return new Builder(type, brokers, controllers, autoStart, 
securityProtocol, metadataVersion);
+public static Builder builder(ClusterConfig clusterConfig) {

Review Comment:
   > That deep copy is error-prone.
   
   Did you mean we shouldn't send ClusterConfig as an argument to `builder` ? 
If so, I can remove this method (i.e. L168). But that means we have to 
explicitly write all setters if we want to copy a ClusterConfig to a brand new 
one. 
   
   And I'm not sure why this is error-prone. Would be appreciate if you can 
share more 😃 .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1580513112


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -57,16 +57,18 @@ import org.slf4j.{Logger, LoggerFactory}
 
 import java.util
 import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
-import java.util.{Collections, Optional, Properties, UUID}
+import java.util.{Collections, Optional, UUID}
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 object ZkMigrationIntegrationTest {
-  def addZkBrokerProps(props: Properties): Unit = {
-props.setProperty("inter.broker.listener.name", "EXTERNAL")
-props.setProperty("listeners", 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
-props.setProperty("advertised.listeners", 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
-props.setProperty("listener.security.protocol.map", 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+  def addZkBrokerProps(builder: ClusterConfig.Builder): Unit = {

Review Comment:
   It seems this method is used by `zkClustersForAllMigrationVersions` only. We 
can merge them into one method. For example:
   ```scala
 val serverProperties = new util.HashMap[String, String]()
 serverProperties.put("inter.broker.listener.name", "EXTERNAL")
 serverProperties.put("listeners", 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
 serverProperties.put("advertised.listeners", 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
 serverProperties.put("listener.security.protocol.map", 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
 clusterGenerator.accept(ClusterConfig.defaultBuilder()
   .setMetadataVersion(mv)
   .setBrokers(3)
   .setType(Type.ZK)
   .setServerProperties(serverProperties).build())
   ```



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -453,11 +464,16 @@ class ZkMigrationIntegrationTest {
 
   // Enable migration configs and restart brokers
   log.info("Restart brokers in migration mode")
-  
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
-  
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
-  
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
-  
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-  zkCluster.rollingBrokerRestart()
+  val serverProperties = new util.HashMap[String, String]()
+  serverProperties.putAll(zkCluster.config().serverProperties())

Review Comment:
   ditto



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -517,11 +533,16 @@ class ZkMigrationIntegrationTest {
 
   // Enable migration configs and restart brokers
   log.info("Restart brokers in migration mode")
-  
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
-  
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
-  
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
-  
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-  zkCluster.rollingBrokerRestart() // This would throw if authorizers 
weren't allowed
+  val serverProperties = new util.HashMap[String, String]()
+  serverProperties.putAll(zkCluster.config().serverProperties())

Review Comment:
   ditto



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -802,11 +838,16 @@ class ZkMigrationIntegrationTest {
 
   // Enable migration configs and restart brokers
   log.info("Restart brokers in migration mode")
-  
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
-  
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
-  
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
-  
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-  zkCluster.rollingBrokerRestart()
+  val serverProperties = new util.HashMap[String, String]()
+  serverProperties.putAll(zkCluster.config().serverProperties())

Review Comment:
   ditto



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -667,11 +693,16 @@ class ZkMigrationIntegrationTest {
 
   // Enable migration

Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on PR #15761:
URL: https://github.com/apache/kafka/pull/15761#issuecomment-2078268366

   @brandboat could you please fix the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-24 Thread via GitHub


chia7712 commented on PR #15761:
URL: https://github.com/apache/kafka/pull/15761#issuecomment-2074923817

   @brandboat Could you rebase code to trigger QA again? I run those tests on 
my local. They pass


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-24 Thread via GitHub


brandboat commented on PR #15761:
URL: https://github.com/apache/kafka/pull/15761#issuecomment-2074282565

   Failed tests seems related to this pr, I'll check them later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1577073007


##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -198,7 +177,7 @@ public BootstrapMetadata bootstrapMetadata() {
 return bootstrapMetadata;
 }
 
-public NavigableMap brokerNodes() {
+public Map brokerNodes() {

Review Comment:
   I found all tests  failed... Sorry for wasting your time, I'll be more 
careful :disappointed: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576836597


##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -198,7 +177,7 @@ public BootstrapMetadata bootstrapMetadata() {
 return bootstrapMetadata;
 }
 
-public NavigableMap brokerNodes() {
+public Map brokerNodes() {

Review Comment:
   please ignore this comment since we create all controllers before brokers, 
and hence the order of brokers is not matter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576631689


##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -59,87 +63,66 @@ public Builder setCombined(boolean combined) {
 }
 
 public Builder setNumControllerNodes(int numControllerNodes) {
-if (numControllerNodes < 0) {
-throw new RuntimeException("Invalid negative value for 
numControllerNodes");
-}
-
-while (controllerNodeBuilders.size() > numControllerNodes) {
-controllerNodeBuilders.pollFirstEntry();
-}
-while (controllerNodeBuilders.size() < numControllerNodes) {
-int nextId = startControllerId();
-if (!controllerNodeBuilders.isEmpty()) {
-nextId = controllerNodeBuilders.lastKey() + 1;
-}
-controllerNodeBuilders.put(nextId,
-new ControllerNode.Builder().
-setId(nextId));
-}
+this.numControllerNodes = numControllerNodes;
 return this;
 }
 
 public Builder setNumBrokerNodes(int numBrokerNodes) {
-return setBrokerNodes(numBrokerNodes, 1);
+this.numBrokerNodes = numBrokerNodes;
+return this;
+}
+
+public Builder setNumDisksPerBroker(int numDisksPerBroker) {
+this.numDisksPerBroker = numDisksPerBroker;
+return this;
+}
+
+public Builder setPerBrokerProperties(Map> perBrokerProperties) {
+this.perBrokerProperties = Collections.unmodifiableMap(
+perBrokerProperties.entrySet().stream()
+.collect(Collectors.toMap(Map.Entry::getKey, e -> 
Collections.unmodifiableMap(new HashMap<>(e.getValue());
+return this;
 }
 
-public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) {
+public TestKitNodes build() {
+if (numControllerNodes < 0) {
+throw new RuntimeException("Invalid negative value for 
numControllerNodes");
+}
 if (numBrokerNodes < 0) {
 throw new RuntimeException("Invalid negative value for 
numBrokerNodes");
 }
-if (disksPerBroker <= 0) {
-throw new RuntimeException("Invalid value for disksPerBroker");
-}
-while (brokerNodeBuilders.size() > numBrokerNodes) {
-brokerNodeBuilders.pollFirstEntry();
-}
-while (brokerNodeBuilders.size() < numBrokerNodes) {
-int nextId = startBrokerId();
-if (!brokerNodeBuilders.isEmpty()) {
-nextId = brokerNodeBuilders.lastKey() + 1;
-}
-BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
-.setId(nextId)
-.setNumLogDirectories(disksPerBroker);
-brokerNodeBuilders.put(nextId, brokerNodeBuilder);
+if (numDisksPerBroker <= 0) {
+throw new RuntimeException("Invalid value for 
numDisksPerBroker");
 }
-return this;
-}
 
-public TestKitNodes build() {
 String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
-try {
-if (clusterId == null) {
-clusterId = Uuid.randomUuid();
-}
-TreeMap controllerNodes = new 
TreeMap<>();
-for (ControllerNode.Builder builder : 
controllerNodeBuilders.values()) {
-ControllerNode node = builder.
-build(baseDirectory, clusterId, 
brokerNodeBuilders.containsKey(builder.id()));
-if (controllerNodes.put(node.id(), node) != null) {
-throw new RuntimeException("Duplicate builder for 
controller " + node.id());
-}
-}
-TreeMap brokerNodes = new TreeMap<>();
-for (BrokerNode.Builder builder : brokerNodeBuilders.values()) 
{
-BrokerNode node = builder.
-build(baseDirectory, clusterId, 
controllerNodeBuilders.containsKey(builder.id()));
-if (brokerNodes.put(node.id(), node) != null) {
-throw new RuntimeException("Duplicate builder for 
broker " + node.id());
-}
-}
-return new TestKitNodes(baseDirectory,
-clusterId,
-bootstrapMetadata,
-controllerNodes,
-brokerNodes);
-} catch (Exception e) {
-try {
-Files.delete(Paths.get(baseDirectory));
-} catch (Exception x) {
-throw new RuntimeException("Failed to

Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576531833


##
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##
@@ -84,17 +125,38 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVersio
 assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion())
   }
 
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties 
= Array(
-new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "false"),
+  @ClusterTests(Array(

Review Comment:
   Already added the TODO in the top of this file. L30



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576492165


##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -66,17 +69,30 @@ public Builder setNumLogDirectories(int numLogDirectories) {
 return this;
 }
 
-public BrokerNode build(
-String baseDirectory,
-Uuid clusterId,
-boolean combined
-) {
+public Builder setClusterId(Uuid clusterId) {
+this.clusterId = clusterId;
+return this;
+}
+
+public Builder setBaseDirectory(String baseDirectory) {
+this.baseDirectory = baseDirectory;
+return this;
+}
+
+public Builder setCombined(boolean combined) {
+this.combined = combined;
+return this;
+}
+
+public Builder setPropertyOverrides(Map 
propertyOverrides) {
+this.propertyOverrides = Collections.unmodifiableMap(new 
HashMap<>(propertyOverrides));
+return this;
+}
+
+public BrokerNode build() {
 if (id == -1) {
 throw new RuntimeException("You must set the node id.");
 }
-if (incarnationId == null) {
-incarnationId = Uuid.randomUuid();
-}
 List logDataDirectories = IntStream

Review Comment:
   Could you add null check for `baseDirectory`?



##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -59,73 +65,64 @@ public Builder setCombined(boolean combined) {
 }
 
 public Builder setNumControllerNodes(int numControllerNodes) {
-if (numControllerNodes < 0) {
-throw new RuntimeException("Invalid negative value for 
numControllerNodes");
-}
-
-while (controllerNodeBuilders.size() > numControllerNodes) {
-controllerNodeBuilders.pollFirstEntry();
-}
-while (controllerNodeBuilders.size() < numControllerNodes) {
-int nextId = startControllerId();
-if (!controllerNodeBuilders.isEmpty()) {
-nextId = controllerNodeBuilders.lastKey() + 1;
-}
-controllerNodeBuilders.put(nextId,
-new ControllerNode.Builder().
-setId(nextId));
-}
+this.numControllerNodes = numControllerNodes;
 return this;
 }
 
 public Builder setNumBrokerNodes(int numBrokerNodes) {
-return setBrokerNodes(numBrokerNodes, 1);
+this.numBrokerNodes = numBrokerNodes;
+return this;
+}
+
+public Builder setNumDisksPerBroker(int numDisksPerBroker) {
+this.numDisksPerBroker = numDisksPerBroker;
+return this;
+}
+
+public Builder setPerBrokerProperties(Map> perBrokerProperties) {
+this.perBrokerProperties = Collections.unmodifiableMap(
+perBrokerProperties.entrySet().stream()
+.collect(Collectors.toMap(Map.Entry::getKey, e -> 
Collections.unmodifiableMap(new HashMap<>(e.getValue());
+return this;
 }
 
-public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) {
+public TestKitNodes build() {
+if (numControllerNodes < 0) {
+throw new RuntimeException("Invalid negative value for 
numControllerNodes");
+}
 if (numBrokerNodes < 0) {
 throw new RuntimeException("Invalid negative value for 
numBrokerNodes");
 }
-if (disksPerBroker <= 0) {
-throw new RuntimeException("Invalid value for disksPerBroker");
-}
-while (brokerNodeBuilders.size() > numBrokerNodes) {
-brokerNodeBuilders.pollFirstEntry();
+if (numDisksPerBroker <= 0) {
+throw new RuntimeException("Invalid value for 
numDisksPerBroker");
 }
-while (brokerNodeBuilders.size() < numBrokerNodes) {
-int nextId = startBrokerId();
-if (!brokerNodeBuilders.isEmpty()) {
-nextId = brokerNodeBuilders.lastKey() + 1;
-}
-BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
-.setId(nextId)
-.setNumLogDirectories(disksPerBroker);
-brokerNodeBuilders.put(nextId, brokerNodeBuilder);
-}
-return this;
-}
 
-public TestKitNodes build() {
 String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();

Review Comment:
   We don't need to delete `baseDirectory` since `TestUtils.tempDirectory()` 
will delete the return folder when terminating.



##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -167,11 +164,1

Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576432963


##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -121,16 +146,16 @@ public BrokerNode build(
 private final boolean combined;
 private final Map propertyOverrides;
 
-BrokerNode(
+private BrokerNode(
 Uuid incarnationId,
 MetaPropertiesEnsemble initialMetaPropertiesEnsemble,
 boolean combined,
 Map propertyOverrides
 ) {
-this.incarnationId = incarnationId;
-this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble;

Review Comment:
   I make `logDataDirectories()` return Set instead List to get 
rid of the redundant arraylist wrapping.



##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -121,16 +146,16 @@ public BrokerNode build(
 private final boolean combined;
 private final Map propertyOverrides;
 
-BrokerNode(
+private BrokerNode(
 Uuid incarnationId,
 MetaPropertiesEnsemble initialMetaPropertiesEnsemble,
 boolean combined,
 Map propertyOverrides
 ) {
-this.incarnationId = incarnationId;
-this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble;

Review Comment:
   I make `logDataDirectories()` return `Set` instead `List` to 
get rid of the redundant arraylist wrapping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576155040


##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -167,11 +186,11 @@ private TestKitNodes(
 NavigableMap controllerNodes,
 NavigableMap brokerNodes
 ) {
-this.baseDirectory = baseDirectory;
-this.clusterId = clusterId;
-this.bootstrapMetadata = bootstrapMetadata;
-this.controllerNodes = controllerNodes;
-this.brokerNodes = brokerNodes;
+this.baseDirectory = Objects.requireNonNull(baseDirectory);
+this.clusterId = Objects.requireNonNull(clusterId);
+this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata);
+this.controllerNodes = Objects.requireNonNull(controllerNodes);

Review Comment:
   Please follow the rule of this PR - be a immutable object



##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -121,16 +146,16 @@ public BrokerNode build(
 private final boolean combined;
 private final Map propertyOverrides;
 
-BrokerNode(
+private BrokerNode(
 Uuid incarnationId,
 MetaPropertiesEnsemble initialMetaPropertiesEnsemble,
 boolean combined,
 Map propertyOverrides
 ) {
-this.incarnationId = incarnationId;
-this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble;
+this.incarnationId = Objects.requireNonNull(incarnationId);

Review Comment:
   this is not used so it is fine to remove it



##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -97,9 +107,12 @@ public Builder setBrokerNodes(int numBrokerNodes, int 
disksPerBroker) {
 if (!brokerNodeBuilders.isEmpty()) {
 nextId = brokerNodeBuilders.lastKey() + 1;
 }
-BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()

Review Comment:
   Can we evaluate those settings in `build` method? That can simplify code 
since we don't need to revert the changes (for example, the number of brokers).



##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -121,16 +146,16 @@ public BrokerNode build(
 private final boolean combined;
 private final Map propertyOverrides;
 
-BrokerNode(
+private BrokerNode(
 Uuid incarnationId,
 MetaPropertiesEnsemble initialMetaPropertiesEnsemble,
 boolean combined,
 Map propertyOverrides
 ) {
-this.incarnationId = incarnationId;
-this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble;

Review Comment:
   `logDataDirectories` should return immutable collection



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


brandboat commented on PR #15761:
URL: https://github.com/apache/kafka/pull/15761#issuecomment-2072088191

   > @brandboat Sorry that I leave more comments below. please take a look. 
thanks
   
   Thank you for your patience! I appreciate your input. Already addressed all 
comments as above. Please take another look when you are available :smiley: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576056664


##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -66,11 +74,27 @@ public Builder setNumLogDirectories(int numLogDirectories) {
 return this;
 }
 
-public BrokerNode build(
-String baseDirectory,
-Uuid clusterId,
-boolean combined
-) {
+public Builder setClusterId(Uuid clusterId) {
+this.clusterId = clusterId;
+return this;
+}
+
+public Builder setBaseDirectory(String baseDirectory) {
+this.baseDirectory = baseDirectory;
+return this;
+}
+
+public Builder setCombined(boolean combined) {
+this.combined = combined;
+return this;
+}
+
+public Builder setPropertyOverrides(Map 
propertyOverrides) {
+this.propertyOverrides = 
Collections.unmodifiableMap(propertyOverrides);

Review Comment:
   Thanks, I'll address this in other builders also.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-22 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1575290259


##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -66,11 +74,27 @@ public Builder setNumLogDirectories(int numLogDirectories) {
 return this;
 }
 
-public BrokerNode build(
-String baseDirectory,
-Uuid clusterId,
-boolean combined
-) {
+public Builder setClusterId(Uuid clusterId) {
+this.clusterId = clusterId;
+return this;
+}
+
+public Builder setBaseDirectory(String baseDirectory) {
+this.baseDirectory = baseDirectory;
+return this;
+}
+
+public Builder setCombined(boolean combined) {
+this.combined = combined;
+return this;
+}
+
+public Builder setPropertyOverrides(Map 
propertyOverrides) {
+this.propertyOverrides = 
Collections.unmodifiableMap(propertyOverrides);

Review Comment:
   Maybe we need to do a copy in order to avoid changes produced from outer.



##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -97,14 +96,31 @@ public Builder setBrokerNodes(int numBrokerNodes, int 
disksPerBroker) {
 if (!brokerNodeBuilders.isEmpty()) {
 nextId = brokerNodeBuilders.lastKey() + 1;
 }
-BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
+BrokerNode.Builder brokerNodeBuilder = BrokerNode.builder()
 .setId(nextId)
 .setNumLogDirectories(disksPerBroker);
 brokerNodeBuilders.put(nextId, brokerNodeBuilder);
 }
 return this;
 }
 
+/**
+ * Set per broker properties overrides, this setter must be invoked 
after setBrokerNodes which
+ * setup broker id and broker builder.
+ * @param perBrokerPropertiesOverrides properties to override in each 
broker
+ * @return Builder
+ */
+public Builder setPerBrokerPropertiesOverrides(Map> perBrokerPropertiesOverrides) {
+perBrokerPropertiesOverrides.forEach((brokerId, properties) -> {
+if (!brokerNodeBuilders.containsKey(brokerId)) {
+throw new RuntimeException("Broker id " + brokerId + " 
does not exist");
+}
+Map propertiesOverride = new 
HashMap<>(properties);

Review Comment:
   the deep copy should be addressed by `setPropertyOverrides`



##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -97,14 +96,32 @@ public Builder setBrokerNodes(int numBrokerNodes, int 
disksPerBroker) {
 if (!brokerNodeBuilders.isEmpty()) {
 nextId = brokerNodeBuilders.lastKey() + 1;
 }
-BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
+BrokerNode.Builder brokerNodeBuilder = BrokerNode.builder()
 .setId(nextId)
 .setNumLogDirectories(disksPerBroker);
 brokerNodeBuilders.put(nextId, brokerNodeBuilder);
 }
 return this;
 }
 
+/**
+ * Set per broker properties overrides, this setter must be invoked 
after setBrokerNodes which

Review Comment:
   It seems `TestKitNodes` need to be refactor also. In short, it should 
verify/apply all settings when building.



##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -126,8 +138,12 @@ public MetadataVersion metadataVersion() {
 return metadataVersion;
 }
 
-public Properties brokerServerProperties(int brokerId) {
-return perBrokerOverrideProperties.computeIfAbsent(brokerId, __ -> new 
Properties());
+public Map brokerServerProperties(int brokerId) {

Review Comment:
   This helper is used by `ZkClusterInvocationContext` only. Maybe we can 
remove this one to simplify `ClusterConfig`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-22 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1575101365


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -173,63 +199,104 @@ public static class Builder {
 private String listenerName;
 private File trustStoreFile;
 private MetadataVersion metadataVersion;
+private Map serverProperties = 
Collections.unmodifiableMap(new HashMap<>());

Review Comment:
   Yes. The code here is redundant. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-22 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1575053494


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -173,63 +199,104 @@ public static class Builder {
 private String listenerName;
 private File trustStoreFile;
 private MetadataVersion metadataVersion;
+private Map serverProperties = 
Collections.unmodifiableMap(new HashMap<>());
+private Map producerProperties = 
Collections.unmodifiableMap(new HashMap<>());
+private Map consumerProperties = 
Collections.unmodifiableMap(new HashMap<>());
+private Map adminClientProperties = 
Collections.unmodifiableMap(new HashMap<>());
+private Map saslServerProperties = 
Collections.unmodifiableMap(new HashMap<>());
+private Map saslClientProperties = 
Collections.unmodifiableMap(new HashMap<>());
+private Map> perBrokerOverrideProperties 
= Collections.unmodifiableMap(new HashMap<>());
 
-Builder(Type type, int brokers, int controllers, boolean autoStart, 
SecurityProtocol securityProtocol, MetadataVersion metadataVersion) {
-this.type = type;
-this.brokers = brokers;
-this.controllers = controllers;
-this.autoStart = autoStart;
-this.securityProtocol = securityProtocol;
-this.metadataVersion = metadataVersion;
-}
+private Builder() {}
 
-public Builder type(Type type) {
+public Builder setType(Type type) {
 this.type = type;
 return this;
 }
 
-public Builder brokers(int brokers) {
+public Builder setBrokers(int brokers) {
 this.brokers = brokers;
 return this;
 }
 
-public Builder controllers(int controllers) {
+public Builder setControllers(int controllers) {
 this.controllers = controllers;
 return this;
 }
 
-public Builder name(String name) {
+public Builder setName(String name) {
 this.name = name;
 return this;
 }
 
-public Builder autoStart(boolean autoStart) {
+public Builder setAutoStart(boolean autoStart) {
 this.autoStart = autoStart;
 return this;
 }
 
-public Builder securityProtocol(SecurityProtocol securityProtocol) {
+public Builder setSecurityProtocol(SecurityProtocol securityProtocol) {
 this.securityProtocol = securityProtocol;
 return this;
 }
 
-public Builder listenerName(String listenerName) {
+public Builder setListenerName(String listenerName) {
 this.listenerName = listenerName;
 return this;
 }
 
-public Builder trustStoreFile(File trustStoreFile) {
+public Builder setTrustStoreFile(File trustStoreFile) {
 this.trustStoreFile = trustStoreFile;
 return this;
 }
 
-public Builder metadataVersion(MetadataVersion metadataVersion) {
+public Builder setMetadataVersion(MetadataVersion metadataVersion) {
 this.metadataVersion = metadataVersion;
 return this;
 }
 
+public Builder setServerProperties(Map 
serverProperties) {
+this.serverProperties = 
Collections.unmodifiableMap(serverProperties);
+return this;
+}
+
+public Builder setConsumerProperties(Map 
consumerProperties) {
+this.consumerProperties = 
Collections.unmodifiableMap(consumerProperties);
+return this;
+}
+
+public Builder setProducerProperties(Map 
producerProperties) {
+this.producerProperties = 
Collections.unmodifiableMap(producerProperties);
+return this;
+}
+
+public Builder setAdminClientProperties(Map 
adminClientProperties) {
+this.adminClientProperties = 
Collections.unmodifiableMap(adminClientProperties);
+return this;
+}
+
+public Builder setSaslServerProperties(Map 
saslServerProperties) {
+this.saslServerProperties = 
Collections.unmodifiableMap(saslServerProperties);
+return this;
+}
+
+public Builder setSaslClientProperties(Map 
saslClientProperties) {
+this.saslClientProperties = 
Collections.unmodifiableMap(saslClientProperties);
+return this;
+}
+
+public Builder setPerBrokerProperties(Map> perBrokerOverrideProperties) {
+this.perBrokerOverrideProperties = Collections.unmodifiableMap(

Review Comment:
   ```java
   this.perBrokerOverrideProperties = Collections.unmodifiableMap(
   perBrokerOverrideProperties.entrySet().stream()
   .collect(Collectors.toMap(Map.Entry::getKey, e 
-> Collections.unmodifiableMap(new HashMap<>(e.getValue());
   ```



##
core/s

Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-22 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1574974316


##
core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala:
##
@@ -18,41 +18,58 @@ package kafka.server
 
 import java.net.Socket
 import java.util.Collections
-
 import kafka.api.{KafkaSasl, SaslSetup}
-import kafka.test.annotation.{ClusterTest, Type}
+import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, 
kafkaServerSaslMechanisms}
+import kafka.test.annotation.{ClusterTemplate, Type}
 import kafka.test.junit.ClusterTestExtensions
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
 import kafka.utils.JaasTestUtils
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.message.SaslHandshakeRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{ApiVersionsRequest, 
ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse}
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.config.KafkaSecurityConfigs
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.extension.ExtendWith
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 
 import scala.jdk.CollectionConverters._
 
+object SaslApiVersionsRequestTest {
+  val kafkaClientSaslMechanism = "PLAIN"
+  val kafkaServerSaslMechanisms: Seq[String] = List("PLAIN")
+  val controlPlaneListenerName = "CONTROL_PLANE"
+  val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+
+  def saslApiVersionsRequestClusterConfig(clusterGenerator: ClusterGenerator): 
Unit = {
+clusterGenerator.accept(ClusterConfig.defaultBuilder
+  .securityProtocol(securityProtocol)
+  .`type`(Type.ZK)
+  
.putSaslServerProperty(KafkaSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
 kafkaClientSaslMechanism)
+  
.putSaslServerProperty(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
kafkaServerSaslMechanisms.mkString(","))
+  .putSaslClientProperty(SaslConfigs.SASL_MECHANISM, 
kafkaClientSaslMechanism)
+  // Configure control plane listener to make sure we have separate 
listeners for testing.
+  .putServerProperty(KafkaConfig.ControlPlaneListenerNameProp, 
controlPlaneListenerName)
+  .putServerProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol")
+  .putServerProperty("listeners", 
s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
+  .putServerProperty(KafkaConfig.AdvertisedListenersProp, 
s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
+  .build())
+  }
+}
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
-
-  val kafkaClientSaslMechanism = "PLAIN"
-  val kafkaServerSaslMechanisms = List("PLAIN")
-
   private var sasl: SaslSetup = _
 
   @BeforeEach
-  def setupSasl(config: ClusterConfig): Unit = {
+  def setupSasl(): Unit = {
 sasl = new SaslSetup() {}

Review Comment:
   Currently, no.  Since we clear `java.security.auth.login.config` in 
[QuorumTestHarness#tearDown](https://github.com/apache/kafka/blob/59c781415fc37c89aa087d7c2999cec7f82f6188/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala#L440).
 So if we initialize sasl in `@BeforeAll`, first test passed, second one raise 
error as below. 
   ```text
   Could not find a 'KafkaServer' or 'control_plane.KafkaServer' entry in the 
JAAS configuration. System property 'java.security.auth.login.config' is not set
   ```
   This would require some effort if we want to make this happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573858499


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -67,13 +69,16 @@ public class ClusterConfig {
 this.listenerName = listenerName;
 this.trustStoreFile = trustStoreFile;
 this.metadataVersion = metadataVersion;
-this.serverProperties = copyOf(serverProperties);
-this.producerProperties = copyOf(producerProperties);
-this.consumerProperties = copyOf(consumerProperties);
-this.adminClientProperties = copyOf(adminClientProperties);
-this.saslServerProperties = copyOf(saslServerProperties);
-this.saslClientProperties = copyOf(saslClientProperties);
-perBrokerOverrideProperties.forEach((brokerId, props) -> 
this.perBrokerOverrideProperties.put(brokerId, copyOf(props)));
+this.serverProperties = Collections.unmodifiableMap(serverProperties);
+this.producerProperties = 
Collections.unmodifiableMap(producerProperties);
+this.consumerProperties = 
Collections.unmodifiableMap(consumerProperties);
+this.adminClientProperties = 
Collections.unmodifiableMap(adminClientProperties);
+this.saslServerProperties = 
Collections.unmodifiableMap(saslServerProperties);
+this.saslClientProperties = 
Collections.unmodifiableMap(saslClientProperties);
+this.perBrokerOverrideProperties = Collections.unmodifiableMap(

Review Comment:
   Got it, thanks for explanation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573857883


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -67,13 +69,16 @@ public class ClusterConfig {
 this.listenerName = listenerName;
 this.trustStoreFile = trustStoreFile;
 this.metadataVersion = metadataVersion;
-this.serverProperties = copyOf(serverProperties);
-this.producerProperties = copyOf(producerProperties);
-this.consumerProperties = copyOf(consumerProperties);
-this.adminClientProperties = copyOf(adminClientProperties);
-this.saslServerProperties = copyOf(saslServerProperties);
-this.saslClientProperties = copyOf(saslClientProperties);
-perBrokerOverrideProperties.forEach((brokerId, props) -> 
this.perBrokerOverrideProperties.put(brokerId, copyOf(props)));
+this.serverProperties = Collections.unmodifiableMap(serverProperties);
+this.producerProperties = 
Collections.unmodifiableMap(producerProperties);
+this.consumerProperties = 
Collections.unmodifiableMap(consumerProperties);
+this.adminClientProperties = 
Collections.unmodifiableMap(adminClientProperties);
+this.saslServerProperties = 
Collections.unmodifiableMap(saslServerProperties);
+this.saslClientProperties = 
Collections.unmodifiableMap(saslClientProperties);
+this.perBrokerOverrideProperties = Collections.unmodifiableMap(

Review Comment:
   The purpose of this PR is to make `ClusterConfig` be immutable. However, 
`ClusterConfig` have only immutable "view" of `serverProperties`, 
`producerProperties`, etc. It means `ClusterConfig` is still mutable object 
since callers can change inner variables of `ClusterConfig` by updating 
`ClusterConfig#Builder`. For example, user can call `putSaslServerProperty` to 
chagne `saslServerProperties` of `ClusterConfig`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573855899


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -67,13 +69,16 @@ public class ClusterConfig {
 this.listenerName = listenerName;
 this.trustStoreFile = trustStoreFile;
 this.metadataVersion = metadataVersion;
-this.serverProperties = copyOf(serverProperties);
-this.producerProperties = copyOf(producerProperties);
-this.consumerProperties = copyOf(consumerProperties);
-this.adminClientProperties = copyOf(adminClientProperties);
-this.saslServerProperties = copyOf(saslServerProperties);
-this.saslClientProperties = copyOf(saslClientProperties);
-perBrokerOverrideProperties.forEach((brokerId, props) -> 
this.perBrokerOverrideProperties.put(brokerId, copyOf(props)));
+this.serverProperties = Collections.unmodifiableMap(serverProperties);
+this.producerProperties = 
Collections.unmodifiableMap(producerProperties);
+this.consumerProperties = 
Collections.unmodifiableMap(consumerProperties);
+this.adminClientProperties = 
Collections.unmodifiableMap(adminClientProperties);
+this.saslServerProperties = 
Collections.unmodifiableMap(saslServerProperties);
+this.saslClientProperties = 
Collections.unmodifiableMap(saslClientProperties);
+this.perBrokerOverrideProperties = Collections.unmodifiableMap(

Review Comment:
   Pardon me, I'm not quite clear about this comment. Could you explain more ? 
:smiley: 
   For convenience, this pr add something like `public Builder 
putServerProperty(String key, String value) ` in each configurations like 
`serverProperties`, `consumerProperties`. So we don't need to do deep copy in 
ClusterConfig.Builder. 
   Or did you mean we need to use `setServerProperty(Map 
serverProperties)`  instead of `putServerProperty(String key, String value)`  ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573852618


##
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##
@@ -17,48 +17,78 @@
 
 package kafka.server
 
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.extension.ExtendWith
 
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
+@ClusterTestDefaults(brokers = 1)
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
 
-  @BeforeEach
-  def setup(config: ClusterConfig): Unit = {
-super.brokerPropertyOverrides(config.serverProperties())
-  }
-
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties 
= Array(
-new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "true"),
+  @ClusterTests(Array(
+new ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(

Review Comment:
   Filed https://issues.apache.org/jira/browse/KAFKA-16595



##
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##
@@ -17,48 +17,78 @@
 
 package kafka.server
 
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.extension.ExtendWith
 
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
+@ClusterTestDefaults(brokers = 1)
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
 
-  @BeforeEach
-  def setup(config: ClusterConfig): Unit = {
-super.brokerPropertyOverrides(config.serverProperties())
-  }
-
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties 
= Array(
-new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "true"),
+  @ClusterTests(Array(
+new ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(

Review Comment:
   gentle ping @chia7712 , Filed 
https://issues.apache.org/jira/browse/KAFKA-16595



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573851263


##
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##
@@ -17,48 +17,78 @@
 
 package kafka.server
 
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.extension.ExtendWith
 
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
+@ClusterTestDefaults(brokers = 1)
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
 
-  @BeforeEach
-  def setup(config: ClusterConfig): Unit = {
-super.brokerPropertyOverrides(config.serverProperties())
-  }
-
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties 
= Array(
-new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "true"),
+  @ClusterTests(Array(
+new ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(

Review Comment:
   OK, let's talk about this in that JIRA. What I've wondering here is to 
simplify the duplicate ClusterConfigProperty here. Add `ClusterTemplate` to 
`ClusterTests` is a way. But maybe we can have other solutions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573850549


##
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##
@@ -17,48 +17,78 @@
 
 package kafka.server
 
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.extension.ExtendWith
 
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
+@ClusterTestDefaults(brokers = 1)
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
 
-  @BeforeEach
-  def setup(config: ClusterConfig): Unit = {
-super.brokerPropertyOverrides(config.serverProperties())
-  }
-
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties 
= Array(
-new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "true"),
+  @ClusterTests(Array(
+new ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(

Review Comment:
   > Currently we can't add ClusterTemplate in ClusterTests. Should we do that 
is this PR ?
   
   Could you file a Jira for that? Also, we can leave a TODO here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573849975


##
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##
@@ -17,48 +17,78 @@
 
 package kafka.server
 
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.extension.ExtendWith
 
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
+@ClusterTestDefaults(brokers = 1)
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
 
-  @BeforeEach
-  def setup(config: ClusterConfig): Unit = {
-super.brokerPropertyOverrides(config.serverProperties())
-  }
-
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties 
= Array(
-new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "true"),
+  @ClusterTests(Array(
+new ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(

Review Comment:
   Currently we can't add `ClusterTemplate` in ClusterTests. Should we do that 
is this PR ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573839545


##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -290,7 +287,7 @@ public void waitForReadyBrokers() throws 
InterruptedException {
 }
 
 @Override
-public void rollingBrokerRestart() {
+public void rollingBrokerRestart(Optional 
clusterConfig) {

Review Comment:
   As not all implementation support this method, we should remove it from 
interface. The callers can use `getUnderlying` to get zk instance and call that 
method



##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -211,13 +186,36 @@ public static class Builder {
 private String listenerName;
 private File trustStoreFile;
 private MetadataVersion metadataVersion;
-private Properties serverProperties = new Properties();
-private Properties producerProperties = new Properties();
-private Properties consumerProperties = new Properties();
-private Properties adminClientProperties = new Properties();
-private Properties saslServerProperties = new Properties();
-private Properties saslClientProperties = new Properties();
-private final Map perBrokerOverrideProperties = 
new HashMap<>();
+private Map serverProperties = new HashMap<>();
+private Map producerProperties = new HashMap<>();
+private Map consumerProperties = new HashMap<>();
+private Map adminClientProperties = new HashMap<>();
+private Map saslServerProperties = new HashMap<>();
+private Map saslClientProperties = new HashMap<>();
+private Map> perBrokerOverrideProperties 
= new HashMap<>();
+
+Builder() {}
+
+Builder(ClusterConfig clusterConfig) {
+this.type = clusterConfig.type;
+this.brokers = clusterConfig.brokers;
+this.controllers = clusterConfig.controllers;
+this.name = clusterConfig.name;
+this.autoStart = clusterConfig.autoStart;
+this.securityProtocol = clusterConfig.securityProtocol;
+this.listenerName = clusterConfig.listenerName;
+this.trustStoreFile = clusterConfig.trustStoreFile;
+this.metadataVersion = clusterConfig.metadataVersion;
+this.serverProperties = new 
HashMap<>(clusterConfig.serverProperties);
+this.producerProperties = new 
HashMap<>(clusterConfig.producerProperties);
+this.consumerProperties = new 
HashMap<>(clusterConfig.consumerProperties);
+this.adminClientProperties = new 
HashMap<>(clusterConfig.adminClientProperties);
+this.saslServerProperties = new 
HashMap<>(clusterConfig.saslServerProperties);
+this.saslClientProperties = new 
HashMap<>(clusterConfig.saslClientProperties);
+Map> perBrokerOverrideProps = new 
HashMap<>();
+clusterConfig.perBrokerOverrideProperties.forEach((k, v) -> 
perBrokerOverrideProps.put(k, new HashMap<>(v)));
+this.perBrokerOverrideProperties = perBrokerOverrideProps;

Review Comment:
   ```java
   this.perBrokerOverrideProperties = 
clusterConfig.perBrokerOverrideProperties.entrySet().stream()
   .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
HashMap<>(e.getValue(;
   ```



##
core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala:
##
@@ -18,41 +18,58 @@ package kafka.server
 
 import java.net.Socket
 import java.util.Collections
-
 import kafka.api.{KafkaSasl, SaslSetup}
-import kafka.test.annotation.{ClusterTest, Type}
+import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, 
kafkaServerSaslMechanisms}
+import kafka.test.annotation.{ClusterTemplate, Type}
 import kafka.test.junit.ClusterTestExtensions
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
 import kafka.utils.JaasTestUtils
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.message.SaslHandshakeRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{ApiVersionsRequest, 
ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse}
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.config.KafkaSecurityConfigs
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.extension.ExtendWith
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 
 import scala.jdk.CollectionConverters._
 
+object SaslApiVersionsRequestTest {
+  val kafkaClientSaslMechanism = "PLAIN"
+  val kafkaServerSaslMechanisms: Seq[String] = List("PLAIN")
+  val controlPlaneListenerName = "CONTROL_PLANE"
+  val securityProtocol = SecurityProto