Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 merged PR #15761: URL: https://github.com/apache/kafka/pull/15761 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1581724166 ## core/src/test/java/kafka/test/ClusterConfigTest.java: ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.test.annotation.Type; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; + +public class ClusterConfigTest { + +@Test +public void testClusterConfigBuilder() throws IOException { +File trustStoreFile = TestUtils.tempFile(); + +ClusterConfig clusterConfig = ClusterConfig.builder() +.setType(Type.KRAFT) +.setBrokers(1) +.setControllers(1) +.setName("builder-test") +.setAutoStart(true) +.setSecurityProtocol(SecurityProtocol.PLAINTEXT) +.setListenerName("EXTERNAL") +.setTrustStoreFile(trustStoreFile) +.setMetadataVersion(MetadataVersion.IBP_0_8_0) +.setServerProperties(Collections.singletonMap("broker", "broker_value")) +.setConsumerProperties(Collections.singletonMap("consumer", "consumer_value")) +.setProducerProperties(Collections.singletonMap("producer", "producer_value")) + .setAdminClientProperties(Collections.singletonMap("admin_client", "admin_client_value")) + .setSaslClientProperties(Collections.singletonMap("sasl_client", "sasl_client_value")) + .setSaslServerProperties(Collections.singletonMap("sasl_server", "sasl_server_value")) +.setPerBrokerProperties(Collections.singletonMap(0, Collections.singletonMap("broker_0", "broker_0_value"))) +.build(); + +Assertions.assertEquals(Type.KRAFT, clusterConfig.clusterType()); Review Comment: Sign... But we may still overlook add fields in `equals()` and `hashCode`. I'll address the test as you provided in this comment. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1581723855 ## core/src/test/java/kafka/test/ClusterConfigTest.java: ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.test.annotation.Type; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; + +public class ClusterConfigTest { + +@Test +public void testClusterConfigBuilder() throws IOException { +File trustStoreFile = TestUtils.tempFile(); + +ClusterConfig clusterConfig = ClusterConfig.builder() +.setType(Type.KRAFT) +.setBrokers(1) +.setControllers(1) +.setName("builder-test") +.setAutoStart(true) +.setSecurityProtocol(SecurityProtocol.PLAINTEXT) +.setListenerName("EXTERNAL") +.setTrustStoreFile(trustStoreFile) +.setMetadataVersion(MetadataVersion.IBP_0_8_0) +.setServerProperties(Collections.singletonMap("broker", "broker_value")) +.setConsumerProperties(Collections.singletonMap("consumer", "consumer_value")) +.setProducerProperties(Collections.singletonMap("producer", "producer_value")) + .setAdminClientProperties(Collections.singletonMap("admin_client", "admin_client_value")) + .setSaslClientProperties(Collections.singletonMap("sasl_client", "sasl_client_value")) + .setSaslServerProperties(Collections.singletonMap("sasl_server", "sasl_server_value")) +.setPerBrokerProperties(Collections.singletonMap(0, Collections.singletonMap("broker_0", "broker_0_value"))) +.build(); + +Assertions.assertEquals(Type.KRAFT, clusterConfig.clusterType()); Review Comment: Sorry, I missed the important part in this test. I'm going to override `equals()` and `hashCode` in ClusterConfig instead of using reflect method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1581597711 ## core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala: ## @@ -71,11 +72,16 @@ class KafkaServerKRaftRegistrationTest { val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3) // Enable migration configs and restart brokers - zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") - zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig()) - zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") - zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") - zkCluster.rollingBrokerRestart() + val serverProperties = new java.util.HashMap[String, String]() Review Comment: ```java val serverProperties = new util.HashMap[String, String](zkCluster.config().serverProperties(zkCluster.config().serverProperties())) ``` ## core/src/test/java/kafka/test/ClusterConfigTest.java: ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.test.annotation.Type; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; + +public class ClusterConfigTest { + +@Test +public void testClusterConfigBuilder() throws IOException { +File trustStoreFile = TestUtils.tempFile(); + +ClusterConfig clusterConfig = ClusterConfig.builder() +.setType(Type.KRAFT) +.setBrokers(1) +.setControllers(1) +.setName("builder-test") +.setAutoStart(true) +.setSecurityProtocol(SecurityProtocol.PLAINTEXT) +.setListenerName("EXTERNAL") +.setTrustStoreFile(trustStoreFile) +.setMetadataVersion(MetadataVersion.IBP_0_8_0) +.setServerProperties(Collections.singletonMap("broker", "broker_value")) +.setConsumerProperties(Collections.singletonMap("consumer", "consumer_value")) +.setProducerProperties(Collections.singletonMap("producer", "producer_value")) + .setAdminClientProperties(Collections.singletonMap("admin_client", "admin_client_value")) + .setSaslClientProperties(Collections.singletonMap("sasl_client", "sasl_client_value")) + .setSaslServerProperties(Collections.singletonMap("sasl_server", "sasl_server_value")) +.setPerBrokerProperties(Collections.singletonMap(0, Collections.singletonMap("broker_0", "broker_0_value"))) +.build(); + +Assertions.assertEquals(Type.KRAFT, clusterConfig.clusterType()); Review Comment: Dear friend, this test can't protect us from overlooking the field copy since we don't check the field added in the future, right? Maybe we can use reflection to get all fields and then check all dynamically. ```java private static Map fields(ClusterConfig config) { return Arrays.stream(config.getClass().getDeclaredFields()).collect(Collectors.toMap(Field::getName, f -> { f.setAccessible(true); return Assertions.assertDoesNotThrow(() -> f.get(config)); })); } @Test public void testClusterConfigBuilder() throws IOException { File trustStoreFile = TestUtils.tempFile(); ClusterConfig clusterConfig = ClusterConfig.builder() .setType(Type.KRAFT) .setBrokers(3) .setControllers(2) .setName("builder-test") .setAutoStart(true)
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1580549838 ## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ## @@ -48,14 +51,18 @@ public class ClusterTestExtensionsTest { // Static methods can generate cluster configurations static void generate1(ClusterGenerator clusterGenerator) { - clusterGenerator.accept(ClusterConfig.defaultClusterBuilder().name("Generated Test").build()); +Map serverProperties = new HashMap<>(); +serverProperties.put("foo", "bar"); +clusterGenerator.accept(ClusterConfig.defaultBuilder() +.setName("Generated Test") +.setServerProperties(serverProperties) +.build()); } // BeforeEach run after class construction, but before cluster initialization and test invocation @BeforeEach public void beforeEach(ClusterConfig config) { Review Comment: filed https://issues.apache.org/jira/browse/KAFKA-16627 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1580544105 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -139,28 +151,38 @@ public Map nameTags() { return tags; } -public ClusterConfig copyOf() { -ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion); -copy.serverProperties.putAll(serverProperties); -copy.producerProperties.putAll(producerProperties); -copy.consumerProperties.putAll(consumerProperties); -copy.saslServerProperties.putAll(saslServerProperties); -copy.saslClientProperties.putAll(saslClientProperties); -perBrokerOverrideProperties.forEach((brokerId, props) -> { -Properties propsCopy = new Properties(); -propsCopy.putAll(props); -copy.perBrokerOverrideProperties.put(brokerId, propsCopy); -}); -return copy; +public static Builder defaultBuilder() { +return new Builder() +.setType(Type.ZK) +.setBrokers(1) +.setControllers(1) +.setAutoStart(true) +.setSecurityProtocol(SecurityProtocol.PLAINTEXT) +.setMetadataVersion(MetadataVersion.latestTesting()); } -public static Builder defaultClusterBuilder() { -return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latestTesting()); +public static Builder builder() { +return new Builder(); } -public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, - SecurityProtocol securityProtocol, MetadataVersion metadataVersion) { -return new Builder(type, brokers, controllers, autoStart, securityProtocol, metadataVersion); +public static Builder builder(ClusterConfig clusterConfig) { Review Comment: Roger that. Thanks for your patience -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1580542830 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -139,28 +151,38 @@ public Map nameTags() { return tags; } -public ClusterConfig copyOf() { -ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion); -copy.serverProperties.putAll(serverProperties); -copy.producerProperties.putAll(producerProperties); -copy.consumerProperties.putAll(consumerProperties); -copy.saslServerProperties.putAll(saslServerProperties); -copy.saslClientProperties.putAll(saslClientProperties); -perBrokerOverrideProperties.forEach((brokerId, props) -> { -Properties propsCopy = new Properties(); -propsCopy.putAll(props); -copy.perBrokerOverrideProperties.put(brokerId, propsCopy); -}); -return copy; +public static Builder defaultBuilder() { +return new Builder() +.setType(Type.ZK) +.setBrokers(1) +.setControllers(1) +.setAutoStart(true) +.setSecurityProtocol(SecurityProtocol.PLAINTEXT) +.setMetadataVersion(MetadataVersion.latestTesting()); } -public static Builder defaultClusterBuilder() { -return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latestTesting()); +public static Builder builder() { +return new Builder(); } -public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, - SecurityProtocol securityProtocol, MetadataVersion metadataVersion) { -return new Builder(type, brokers, controllers, autoStart, securityProtocol, metadataVersion); +public static Builder builder(ClusterConfig clusterConfig) { Review Comment: > Maybe we should directly remove the helper method here It seems to me this helper could be useful to the tests which have to update a part of configs. Hence, all we need to do is add test to make sure the clone works well :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1580538394 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -139,28 +151,38 @@ public Map nameTags() { return tags; } -public ClusterConfig copyOf() { -ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion); -copy.serverProperties.putAll(serverProperties); -copy.producerProperties.putAll(producerProperties); -copy.consumerProperties.putAll(consumerProperties); -copy.saslServerProperties.putAll(saslServerProperties); -copy.saslClientProperties.putAll(saslClientProperties); -perBrokerOverrideProperties.forEach((brokerId, props) -> { -Properties propsCopy = new Properties(); -propsCopy.putAll(props); -copy.perBrokerOverrideProperties.put(brokerId, propsCopy); -}); -return copy; +public static Builder defaultBuilder() { +return new Builder() +.setType(Type.ZK) +.setBrokers(1) +.setControllers(1) +.setAutoStart(true) +.setSecurityProtocol(SecurityProtocol.PLAINTEXT) +.setMetadataVersion(MetadataVersion.latestTesting()); } -public static Builder defaultClusterBuilder() { -return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latestTesting()); +public static Builder builder() { +return new Builder(); } -public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, - SecurityProtocol securityProtocol, MetadataVersion metadataVersion) { -return new Builder(type, brokers, controllers, autoStart, securityProtocol, metadataVersion); +public static Builder builder(ClusterConfig clusterConfig) { Review Comment: Got it. Maybe we should directly remove the helper method here. WDYT ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1580536170 ## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ## @@ -48,14 +51,18 @@ public class ClusterTestExtensionsTest { // Static methods can generate cluster configurations static void generate1(ClusterGenerator clusterGenerator) { - clusterGenerator.accept(ClusterConfig.defaultClusterBuilder().name("Generated Test").build()); +Map serverProperties = new HashMap<>(); +serverProperties.put("foo", "bar"); +clusterGenerator.accept(ClusterConfig.defaultBuilder() +.setName("Generated Test") +.setServerProperties(serverProperties) +.build()); } // BeforeEach run after class construction, but before cluster initialization and test invocation @BeforeEach public void beforeEach(ClusterConfig config) { Review Comment: Could you please file a ticket for it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1580535934 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -139,28 +151,38 @@ public Map nameTags() { return tags; } -public ClusterConfig copyOf() { -ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion); -copy.serverProperties.putAll(serverProperties); -copy.producerProperties.putAll(producerProperties); -copy.consumerProperties.putAll(consumerProperties); -copy.saslServerProperties.putAll(saslServerProperties); -copy.saslClientProperties.putAll(saslClientProperties); -perBrokerOverrideProperties.forEach((brokerId, props) -> { -Properties propsCopy = new Properties(); -propsCopy.putAll(props); -copy.perBrokerOverrideProperties.put(brokerId, propsCopy); -}); -return copy; +public static Builder defaultBuilder() { +return new Builder() +.setType(Type.ZK) +.setBrokers(1) +.setControllers(1) +.setAutoStart(true) +.setSecurityProtocol(SecurityProtocol.PLAINTEXT) +.setMetadataVersion(MetadataVersion.latestTesting()); } -public static Builder defaultClusterBuilder() { -return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latestTesting()); +public static Builder builder() { +return new Builder(); } -public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, - SecurityProtocol securityProtocol, MetadataVersion metadataVersion) { -return new Builder(type, brokers, controllers, autoStart, securityProtocol, metadataVersion); +public static Builder builder(ClusterConfig clusterConfig) { Review Comment: > Did you mean we shouldn't send ClusterConfig as an argument to builder ? If so, I can remove this method (i.e. L168). But that means we have to explicitly write all setters if we want to copy a ClusterConfig to a brand new one. And I'm not sure why this is error-prone. Would be appreciate if you can share more 😃 . that is a good helper method but we could neglect this copy-all when adding new field to `ClusterConfig` :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1580532456 ## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ## @@ -48,14 +51,18 @@ public class ClusterTestExtensionsTest { // Static methods can generate cluster configurations static void generate1(ClusterGenerator clusterGenerator) { - clusterGenerator.accept(ClusterConfig.defaultClusterBuilder().name("Generated Test").build()); +Map serverProperties = new HashMap<>(); +serverProperties.put("foo", "bar"); +clusterGenerator.accept(ClusterConfig.defaultBuilder() +.setName("Generated Test") +.setServerProperties(serverProperties) +.build()); } // BeforeEach run after class construction, but before cluster initialization and test invocation @BeforeEach public void beforeEach(ClusterConfig config) { Review Comment: Make sense to me. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1580532188 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -139,28 +151,38 @@ public Map nameTags() { return tags; } -public ClusterConfig copyOf() { -ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion); -copy.serverProperties.putAll(serverProperties); -copy.producerProperties.putAll(producerProperties); -copy.consumerProperties.putAll(consumerProperties); -copy.saslServerProperties.putAll(saslServerProperties); -copy.saslClientProperties.putAll(saslClientProperties); -perBrokerOverrideProperties.forEach((brokerId, props) -> { -Properties propsCopy = new Properties(); -propsCopy.putAll(props); -copy.perBrokerOverrideProperties.put(brokerId, propsCopy); -}); -return copy; +public static Builder defaultBuilder() { +return new Builder() +.setType(Type.ZK) +.setBrokers(1) +.setControllers(1) +.setAutoStart(true) +.setSecurityProtocol(SecurityProtocol.PLAINTEXT) +.setMetadataVersion(MetadataVersion.latestTesting()); } -public static Builder defaultClusterBuilder() { -return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latestTesting()); +public static Builder builder() { +return new Builder(); } -public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, - SecurityProtocol securityProtocol, MetadataVersion metadataVersion) { -return new Builder(type, brokers, controllers, autoStart, securityProtocol, metadataVersion); +public static Builder builder(ClusterConfig clusterConfig) { Review Comment: > That deep copy is error-prone. Did you mean we shouldn't send ClusterConfig as an argument to `builder` ? If so, I can remove this method (i.e. L168). But that means we have to explicitly write all setters if we want to copy a ClusterConfig to a brand new one. And I'm not sure why this is error-prone. Would be appreciate if you can share more 😃 . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1580513112 ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -57,16 +57,18 @@ import org.slf4j.{Logger, LoggerFactory} import java.util import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit} -import java.util.{Collections, Optional, Properties, UUID} +import java.util.{Collections, Optional, UUID} import scala.collection.Seq import scala.jdk.CollectionConverters._ object ZkMigrationIntegrationTest { - def addZkBrokerProps(props: Properties): Unit = { -props.setProperty("inter.broker.listener.name", "EXTERNAL") -props.setProperty("listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0") -props.setProperty("advertised.listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0") -props.setProperty("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") + def addZkBrokerProps(builder: ClusterConfig.Builder): Unit = { Review Comment: It seems this method is used by `zkClustersForAllMigrationVersions` only. We can merge them into one method. For example: ```scala val serverProperties = new util.HashMap[String, String]() serverProperties.put("inter.broker.listener.name", "EXTERNAL") serverProperties.put("listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0") serverProperties.put("advertised.listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0") serverProperties.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") clusterGenerator.accept(ClusterConfig.defaultBuilder() .setMetadataVersion(mv) .setBrokers(3) .setType(Type.ZK) .setServerProperties(serverProperties).build()) ``` ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -453,11 +464,16 @@ class ZkMigrationIntegrationTest { // Enable migration configs and restart brokers log.info("Restart brokers in migration mode") - zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") - zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig()) - zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") - zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") - zkCluster.rollingBrokerRestart() + val serverProperties = new util.HashMap[String, String]() + serverProperties.putAll(zkCluster.config().serverProperties()) Review Comment: ditto ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -517,11 +533,16 @@ class ZkMigrationIntegrationTest { // Enable migration configs and restart brokers log.info("Restart brokers in migration mode") - zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") - zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig()) - zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") - zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") - zkCluster.rollingBrokerRestart() // This would throw if authorizers weren't allowed + val serverProperties = new util.HashMap[String, String]() + serverProperties.putAll(zkCluster.config().serverProperties()) Review Comment: ditto ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -802,11 +838,16 @@ class ZkMigrationIntegrationTest { // Enable migration configs and restart brokers log.info("Restart brokers in migration mode") - zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") - zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig()) - zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") - zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") - zkCluster.rollingBrokerRestart() + val serverProperties = new util.HashMap[String, String]() + serverProperties.putAll(zkCluster.config().serverProperties()) Review Comment: ditto ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -667,11 +693,16 @@ class ZkMigrationIntegrationTest { // Enable migration
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on PR #15761: URL: https://github.com/apache/kafka/pull/15761#issuecomment-2078268366 @brandboat could you please fix the conflicts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on PR #15761: URL: https://github.com/apache/kafka/pull/15761#issuecomment-2074923817 @brandboat Could you rebase code to trigger QA again? I run those tests on my local. They pass -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on PR #15761: URL: https://github.com/apache/kafka/pull/15761#issuecomment-2074282565 Failed tests seems related to this pr, I'll check them later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1577073007 ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -198,7 +177,7 @@ public BootstrapMetadata bootstrapMetadata() { return bootstrapMetadata; } -public NavigableMap brokerNodes() { +public Map brokerNodes() { Review Comment: I found all tests failed... Sorry for wasting your time, I'll be more careful :disappointed: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576836597 ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -198,7 +177,7 @@ public BootstrapMetadata bootstrapMetadata() { return bootstrapMetadata; } -public NavigableMap brokerNodes() { +public Map brokerNodes() { Review Comment: please ignore this comment since we create all controllers before brokers, and hence the order of brokers is not matter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576631689 ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -59,87 +63,66 @@ public Builder setCombined(boolean combined) { } public Builder setNumControllerNodes(int numControllerNodes) { -if (numControllerNodes < 0) { -throw new RuntimeException("Invalid negative value for numControllerNodes"); -} - -while (controllerNodeBuilders.size() > numControllerNodes) { -controllerNodeBuilders.pollFirstEntry(); -} -while (controllerNodeBuilders.size() < numControllerNodes) { -int nextId = startControllerId(); -if (!controllerNodeBuilders.isEmpty()) { -nextId = controllerNodeBuilders.lastKey() + 1; -} -controllerNodeBuilders.put(nextId, -new ControllerNode.Builder(). -setId(nextId)); -} +this.numControllerNodes = numControllerNodes; return this; } public Builder setNumBrokerNodes(int numBrokerNodes) { -return setBrokerNodes(numBrokerNodes, 1); +this.numBrokerNodes = numBrokerNodes; +return this; +} + +public Builder setNumDisksPerBroker(int numDisksPerBroker) { +this.numDisksPerBroker = numDisksPerBroker; +return this; +} + +public Builder setPerBrokerProperties(Map> perBrokerProperties) { +this.perBrokerProperties = Collections.unmodifiableMap( +perBrokerProperties.entrySet().stream() +.collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue()); +return this; } -public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) { +public TestKitNodes build() { +if (numControllerNodes < 0) { +throw new RuntimeException("Invalid negative value for numControllerNodes"); +} if (numBrokerNodes < 0) { throw new RuntimeException("Invalid negative value for numBrokerNodes"); } -if (disksPerBroker <= 0) { -throw new RuntimeException("Invalid value for disksPerBroker"); -} -while (brokerNodeBuilders.size() > numBrokerNodes) { -brokerNodeBuilders.pollFirstEntry(); -} -while (brokerNodeBuilders.size() < numBrokerNodes) { -int nextId = startBrokerId(); -if (!brokerNodeBuilders.isEmpty()) { -nextId = brokerNodeBuilders.lastKey() + 1; -} -BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder() -.setId(nextId) -.setNumLogDirectories(disksPerBroker); -brokerNodeBuilders.put(nextId, brokerNodeBuilder); +if (numDisksPerBroker <= 0) { +throw new RuntimeException("Invalid value for numDisksPerBroker"); } -return this; -} -public TestKitNodes build() { String baseDirectory = TestUtils.tempDirectory().getAbsolutePath(); -try { -if (clusterId == null) { -clusterId = Uuid.randomUuid(); -} -TreeMap controllerNodes = new TreeMap<>(); -for (ControllerNode.Builder builder : controllerNodeBuilders.values()) { -ControllerNode node = builder. -build(baseDirectory, clusterId, brokerNodeBuilders.containsKey(builder.id())); -if (controllerNodes.put(node.id(), node) != null) { -throw new RuntimeException("Duplicate builder for controller " + node.id()); -} -} -TreeMap brokerNodes = new TreeMap<>(); -for (BrokerNode.Builder builder : brokerNodeBuilders.values()) { -BrokerNode node = builder. -build(baseDirectory, clusterId, controllerNodeBuilders.containsKey(builder.id())); -if (brokerNodes.put(node.id(), node) != null) { -throw new RuntimeException("Duplicate builder for broker " + node.id()); -} -} -return new TestKitNodes(baseDirectory, -clusterId, -bootstrapMetadata, -controllerNodes, -brokerNodes); -} catch (Exception e) { -try { -Files.delete(Paths.get(baseDirectory)); -} catch (Exception x) { -throw new RuntimeException("Failed to
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576531833 ## core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala: ## @@ -84,17 +125,38 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion()) } - @ClusterTest(metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array( -new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), -new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"), + @ClusterTests(Array( Review Comment: Already added the TODO in the top of this file. L30 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576492165 ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -66,17 +69,30 @@ public Builder setNumLogDirectories(int numLogDirectories) { return this; } -public BrokerNode build( -String baseDirectory, -Uuid clusterId, -boolean combined -) { +public Builder setClusterId(Uuid clusterId) { +this.clusterId = clusterId; +return this; +} + +public Builder setBaseDirectory(String baseDirectory) { +this.baseDirectory = baseDirectory; +return this; +} + +public Builder setCombined(boolean combined) { +this.combined = combined; +return this; +} + +public Builder setPropertyOverrides(Map propertyOverrides) { +this.propertyOverrides = Collections.unmodifiableMap(new HashMap<>(propertyOverrides)); +return this; +} + +public BrokerNode build() { if (id == -1) { throw new RuntimeException("You must set the node id."); } -if (incarnationId == null) { -incarnationId = Uuid.randomUuid(); -} List logDataDirectories = IntStream Review Comment: Could you add null check for `baseDirectory`? ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -59,73 +65,64 @@ public Builder setCombined(boolean combined) { } public Builder setNumControllerNodes(int numControllerNodes) { -if (numControllerNodes < 0) { -throw new RuntimeException("Invalid negative value for numControllerNodes"); -} - -while (controllerNodeBuilders.size() > numControllerNodes) { -controllerNodeBuilders.pollFirstEntry(); -} -while (controllerNodeBuilders.size() < numControllerNodes) { -int nextId = startControllerId(); -if (!controllerNodeBuilders.isEmpty()) { -nextId = controllerNodeBuilders.lastKey() + 1; -} -controllerNodeBuilders.put(nextId, -new ControllerNode.Builder(). -setId(nextId)); -} +this.numControllerNodes = numControllerNodes; return this; } public Builder setNumBrokerNodes(int numBrokerNodes) { -return setBrokerNodes(numBrokerNodes, 1); +this.numBrokerNodes = numBrokerNodes; +return this; +} + +public Builder setNumDisksPerBroker(int numDisksPerBroker) { +this.numDisksPerBroker = numDisksPerBroker; +return this; +} + +public Builder setPerBrokerProperties(Map> perBrokerProperties) { +this.perBrokerProperties = Collections.unmodifiableMap( +perBrokerProperties.entrySet().stream() +.collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue()); +return this; } -public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) { +public TestKitNodes build() { +if (numControllerNodes < 0) { +throw new RuntimeException("Invalid negative value for numControllerNodes"); +} if (numBrokerNodes < 0) { throw new RuntimeException("Invalid negative value for numBrokerNodes"); } -if (disksPerBroker <= 0) { -throw new RuntimeException("Invalid value for disksPerBroker"); -} -while (brokerNodeBuilders.size() > numBrokerNodes) { -brokerNodeBuilders.pollFirstEntry(); +if (numDisksPerBroker <= 0) { +throw new RuntimeException("Invalid value for numDisksPerBroker"); } -while (brokerNodeBuilders.size() < numBrokerNodes) { -int nextId = startBrokerId(); -if (!brokerNodeBuilders.isEmpty()) { -nextId = brokerNodeBuilders.lastKey() + 1; -} -BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder() -.setId(nextId) -.setNumLogDirectories(disksPerBroker); -brokerNodeBuilders.put(nextId, brokerNodeBuilder); -} -return this; -} -public TestKitNodes build() { String baseDirectory = TestUtils.tempDirectory().getAbsolutePath(); Review Comment: We don't need to delete `baseDirectory` since `TestUtils.tempDirectory()` will delete the return folder when terminating. ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -167,11 +164,1
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576432963 ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -121,16 +146,16 @@ public BrokerNode build( private final boolean combined; private final Map propertyOverrides; -BrokerNode( +private BrokerNode( Uuid incarnationId, MetaPropertiesEnsemble initialMetaPropertiesEnsemble, boolean combined, Map propertyOverrides ) { -this.incarnationId = incarnationId; -this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble; Review Comment: I make `logDataDirectories()` return Set instead List to get rid of the redundant arraylist wrapping. ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -121,16 +146,16 @@ public BrokerNode build( private final boolean combined; private final Map propertyOverrides; -BrokerNode( +private BrokerNode( Uuid incarnationId, MetaPropertiesEnsemble initialMetaPropertiesEnsemble, boolean combined, Map propertyOverrides ) { -this.incarnationId = incarnationId; -this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble; Review Comment: I make `logDataDirectories()` return `Set` instead `List` to get rid of the redundant arraylist wrapping. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576155040 ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -167,11 +186,11 @@ private TestKitNodes( NavigableMap controllerNodes, NavigableMap brokerNodes ) { -this.baseDirectory = baseDirectory; -this.clusterId = clusterId; -this.bootstrapMetadata = bootstrapMetadata; -this.controllerNodes = controllerNodes; -this.brokerNodes = brokerNodes; +this.baseDirectory = Objects.requireNonNull(baseDirectory); +this.clusterId = Objects.requireNonNull(clusterId); +this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata); +this.controllerNodes = Objects.requireNonNull(controllerNodes); Review Comment: Please follow the rule of this PR - be a immutable object ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -121,16 +146,16 @@ public BrokerNode build( private final boolean combined; private final Map propertyOverrides; -BrokerNode( +private BrokerNode( Uuid incarnationId, MetaPropertiesEnsemble initialMetaPropertiesEnsemble, boolean combined, Map propertyOverrides ) { -this.incarnationId = incarnationId; -this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble; +this.incarnationId = Objects.requireNonNull(incarnationId); Review Comment: this is not used so it is fine to remove it ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -97,9 +107,12 @@ public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) { if (!brokerNodeBuilders.isEmpty()) { nextId = brokerNodeBuilders.lastKey() + 1; } -BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder() Review Comment: Can we evaluate those settings in `build` method? That can simplify code since we don't need to revert the changes (for example, the number of brokers). ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -121,16 +146,16 @@ public BrokerNode build( private final boolean combined; private final Map propertyOverrides; -BrokerNode( +private BrokerNode( Uuid incarnationId, MetaPropertiesEnsemble initialMetaPropertiesEnsemble, boolean combined, Map propertyOverrides ) { -this.incarnationId = incarnationId; -this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble; Review Comment: `logDataDirectories` should return immutable collection -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on PR #15761: URL: https://github.com/apache/kafka/pull/15761#issuecomment-2072088191 > @brandboat Sorry that I leave more comments below. please take a look. thanks Thank you for your patience! I appreciate your input. Already addressed all comments as above. Please take another look when you are available :smiley: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576056664 ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -66,11 +74,27 @@ public Builder setNumLogDirectories(int numLogDirectories) { return this; } -public BrokerNode build( -String baseDirectory, -Uuid clusterId, -boolean combined -) { +public Builder setClusterId(Uuid clusterId) { +this.clusterId = clusterId; +return this; +} + +public Builder setBaseDirectory(String baseDirectory) { +this.baseDirectory = baseDirectory; +return this; +} + +public Builder setCombined(boolean combined) { +this.combined = combined; +return this; +} + +public Builder setPropertyOverrides(Map propertyOverrides) { +this.propertyOverrides = Collections.unmodifiableMap(propertyOverrides); Review Comment: Thanks, I'll address this in other builders also. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1575290259 ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -66,11 +74,27 @@ public Builder setNumLogDirectories(int numLogDirectories) { return this; } -public BrokerNode build( -String baseDirectory, -Uuid clusterId, -boolean combined -) { +public Builder setClusterId(Uuid clusterId) { +this.clusterId = clusterId; +return this; +} + +public Builder setBaseDirectory(String baseDirectory) { +this.baseDirectory = baseDirectory; +return this; +} + +public Builder setCombined(boolean combined) { +this.combined = combined; +return this; +} + +public Builder setPropertyOverrides(Map propertyOverrides) { +this.propertyOverrides = Collections.unmodifiableMap(propertyOverrides); Review Comment: Maybe we need to do a copy in order to avoid changes produced from outer. ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -97,14 +96,31 @@ public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) { if (!brokerNodeBuilders.isEmpty()) { nextId = brokerNodeBuilders.lastKey() + 1; } -BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder() +BrokerNode.Builder brokerNodeBuilder = BrokerNode.builder() .setId(nextId) .setNumLogDirectories(disksPerBroker); brokerNodeBuilders.put(nextId, brokerNodeBuilder); } return this; } +/** + * Set per broker properties overrides, this setter must be invoked after setBrokerNodes which + * setup broker id and broker builder. + * @param perBrokerPropertiesOverrides properties to override in each broker + * @return Builder + */ +public Builder setPerBrokerPropertiesOverrides(Map> perBrokerPropertiesOverrides) { +perBrokerPropertiesOverrides.forEach((brokerId, properties) -> { +if (!brokerNodeBuilders.containsKey(brokerId)) { +throw new RuntimeException("Broker id " + brokerId + " does not exist"); +} +Map propertiesOverride = new HashMap<>(properties); Review Comment: the deep copy should be addressed by `setPropertyOverrides` ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -97,14 +96,32 @@ public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) { if (!brokerNodeBuilders.isEmpty()) { nextId = brokerNodeBuilders.lastKey() + 1; } -BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder() +BrokerNode.Builder brokerNodeBuilder = BrokerNode.builder() .setId(nextId) .setNumLogDirectories(disksPerBroker); brokerNodeBuilders.put(nextId, brokerNodeBuilder); } return this; } +/** + * Set per broker properties overrides, this setter must be invoked after setBrokerNodes which Review Comment: It seems `TestKitNodes` need to be refactor also. In short, it should verify/apply all settings when building. ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -126,8 +138,12 @@ public MetadataVersion metadataVersion() { return metadataVersion; } -public Properties brokerServerProperties(int brokerId) { -return perBrokerOverrideProperties.computeIfAbsent(brokerId, __ -> new Properties()); +public Map brokerServerProperties(int brokerId) { Review Comment: This helper is used by `ZkClusterInvocationContext` only. Maybe we can remove this one to simplify `ClusterConfig` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1575101365 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -173,63 +199,104 @@ public static class Builder { private String listenerName; private File trustStoreFile; private MetadataVersion metadataVersion; +private Map serverProperties = Collections.unmodifiableMap(new HashMap<>()); Review Comment: Yes. The code here is redundant. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1575053494 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -173,63 +199,104 @@ public static class Builder { private String listenerName; private File trustStoreFile; private MetadataVersion metadataVersion; +private Map serverProperties = Collections.unmodifiableMap(new HashMap<>()); +private Map producerProperties = Collections.unmodifiableMap(new HashMap<>()); +private Map consumerProperties = Collections.unmodifiableMap(new HashMap<>()); +private Map adminClientProperties = Collections.unmodifiableMap(new HashMap<>()); +private Map saslServerProperties = Collections.unmodifiableMap(new HashMap<>()); +private Map saslClientProperties = Collections.unmodifiableMap(new HashMap<>()); +private Map> perBrokerOverrideProperties = Collections.unmodifiableMap(new HashMap<>()); -Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol, MetadataVersion metadataVersion) { -this.type = type; -this.brokers = brokers; -this.controllers = controllers; -this.autoStart = autoStart; -this.securityProtocol = securityProtocol; -this.metadataVersion = metadataVersion; -} +private Builder() {} -public Builder type(Type type) { +public Builder setType(Type type) { this.type = type; return this; } -public Builder brokers(int brokers) { +public Builder setBrokers(int brokers) { this.brokers = brokers; return this; } -public Builder controllers(int controllers) { +public Builder setControllers(int controllers) { this.controllers = controllers; return this; } -public Builder name(String name) { +public Builder setName(String name) { this.name = name; return this; } -public Builder autoStart(boolean autoStart) { +public Builder setAutoStart(boolean autoStart) { this.autoStart = autoStart; return this; } -public Builder securityProtocol(SecurityProtocol securityProtocol) { +public Builder setSecurityProtocol(SecurityProtocol securityProtocol) { this.securityProtocol = securityProtocol; return this; } -public Builder listenerName(String listenerName) { +public Builder setListenerName(String listenerName) { this.listenerName = listenerName; return this; } -public Builder trustStoreFile(File trustStoreFile) { +public Builder setTrustStoreFile(File trustStoreFile) { this.trustStoreFile = trustStoreFile; return this; } -public Builder metadataVersion(MetadataVersion metadataVersion) { +public Builder setMetadataVersion(MetadataVersion metadataVersion) { this.metadataVersion = metadataVersion; return this; } +public Builder setServerProperties(Map serverProperties) { +this.serverProperties = Collections.unmodifiableMap(serverProperties); +return this; +} + +public Builder setConsumerProperties(Map consumerProperties) { +this.consumerProperties = Collections.unmodifiableMap(consumerProperties); +return this; +} + +public Builder setProducerProperties(Map producerProperties) { +this.producerProperties = Collections.unmodifiableMap(producerProperties); +return this; +} + +public Builder setAdminClientProperties(Map adminClientProperties) { +this.adminClientProperties = Collections.unmodifiableMap(adminClientProperties); +return this; +} + +public Builder setSaslServerProperties(Map saslServerProperties) { +this.saslServerProperties = Collections.unmodifiableMap(saslServerProperties); +return this; +} + +public Builder setSaslClientProperties(Map saslClientProperties) { +this.saslClientProperties = Collections.unmodifiableMap(saslClientProperties); +return this; +} + +public Builder setPerBrokerProperties(Map> perBrokerOverrideProperties) { +this.perBrokerOverrideProperties = Collections.unmodifiableMap( Review Comment: ```java this.perBrokerOverrideProperties = Collections.unmodifiableMap( perBrokerOverrideProperties.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue()); ``` ## core/s
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1574974316 ## core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala: ## @@ -18,41 +18,58 @@ package kafka.server import java.net.Socket import java.util.Collections - import kafka.api.{KafkaSasl, SaslSetup} -import kafka.test.annotation.{ClusterTest, Type} +import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, kafkaServerSaslMechanisms} +import kafka.test.annotation.{ClusterTemplate, Type} import kafka.test.junit.ClusterTestExtensions -import kafka.test.{ClusterConfig, ClusterInstance} +import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance} import kafka.utils.JaasTestUtils +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.message.SaslHandshakeRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse} import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.server.config.KafkaSecurityConfigs import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.{AfterEach, BeforeEach} import scala.jdk.CollectionConverters._ +object SaslApiVersionsRequestTest { + val kafkaClientSaslMechanism = "PLAIN" + val kafkaServerSaslMechanisms: Seq[String] = List("PLAIN") + val controlPlaneListenerName = "CONTROL_PLANE" + val securityProtocol = SecurityProtocol.SASL_PLAINTEXT + + def saslApiVersionsRequestClusterConfig(clusterGenerator: ClusterGenerator): Unit = { +clusterGenerator.accept(ClusterConfig.defaultBuilder + .securityProtocol(securityProtocol) + .`type`(Type.ZK) + .putSaslServerProperty(KafkaSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, kafkaClientSaslMechanism) + .putSaslServerProperty(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(",")) + .putSaslClientProperty(SaslConfigs.SASL_MECHANISM, kafkaClientSaslMechanism) + // Configure control plane listener to make sure we have separate listeners for testing. + .putServerProperty(KafkaConfig.ControlPlaneListenerNameProp, controlPlaneListenerName) + .putServerProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol") + .putServerProperty("listeners", s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0") + .putServerProperty(KafkaConfig.AdvertisedListenersProp, s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0") + .build()) + } +} @ExtendWith(value = Array(classOf[ClusterTestExtensions])) class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { - - val kafkaClientSaslMechanism = "PLAIN" - val kafkaServerSaslMechanisms = List("PLAIN") - private var sasl: SaslSetup = _ @BeforeEach - def setupSasl(config: ClusterConfig): Unit = { + def setupSasl(): Unit = { sasl = new SaslSetup() {} Review Comment: Currently, no. Since we clear `java.security.auth.login.config` in [QuorumTestHarness#tearDown](https://github.com/apache/kafka/blob/59c781415fc37c89aa087d7c2999cec7f82f6188/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala#L440). So if we initialize sasl in `@BeforeAll`, first test passed, second one raise error as below. ```text Could not find a 'KafkaServer' or 'control_plane.KafkaServer' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set ``` This would require some effort if we want to make this happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573858499 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -67,13 +69,16 @@ public class ClusterConfig { this.listenerName = listenerName; this.trustStoreFile = trustStoreFile; this.metadataVersion = metadataVersion; -this.serverProperties = copyOf(serverProperties); -this.producerProperties = copyOf(producerProperties); -this.consumerProperties = copyOf(consumerProperties); -this.adminClientProperties = copyOf(adminClientProperties); -this.saslServerProperties = copyOf(saslServerProperties); -this.saslClientProperties = copyOf(saslClientProperties); -perBrokerOverrideProperties.forEach((brokerId, props) -> this.perBrokerOverrideProperties.put(brokerId, copyOf(props))); +this.serverProperties = Collections.unmodifiableMap(serverProperties); +this.producerProperties = Collections.unmodifiableMap(producerProperties); +this.consumerProperties = Collections.unmodifiableMap(consumerProperties); +this.adminClientProperties = Collections.unmodifiableMap(adminClientProperties); +this.saslServerProperties = Collections.unmodifiableMap(saslServerProperties); +this.saslClientProperties = Collections.unmodifiableMap(saslClientProperties); +this.perBrokerOverrideProperties = Collections.unmodifiableMap( Review Comment: Got it, thanks for explanation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573857883 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -67,13 +69,16 @@ public class ClusterConfig { this.listenerName = listenerName; this.trustStoreFile = trustStoreFile; this.metadataVersion = metadataVersion; -this.serverProperties = copyOf(serverProperties); -this.producerProperties = copyOf(producerProperties); -this.consumerProperties = copyOf(consumerProperties); -this.adminClientProperties = copyOf(adminClientProperties); -this.saslServerProperties = copyOf(saslServerProperties); -this.saslClientProperties = copyOf(saslClientProperties); -perBrokerOverrideProperties.forEach((brokerId, props) -> this.perBrokerOverrideProperties.put(brokerId, copyOf(props))); +this.serverProperties = Collections.unmodifiableMap(serverProperties); +this.producerProperties = Collections.unmodifiableMap(producerProperties); +this.consumerProperties = Collections.unmodifiableMap(consumerProperties); +this.adminClientProperties = Collections.unmodifiableMap(adminClientProperties); +this.saslServerProperties = Collections.unmodifiableMap(saslServerProperties); +this.saslClientProperties = Collections.unmodifiableMap(saslClientProperties); +this.perBrokerOverrideProperties = Collections.unmodifiableMap( Review Comment: The purpose of this PR is to make `ClusterConfig` be immutable. However, `ClusterConfig` have only immutable "view" of `serverProperties`, `producerProperties`, etc. It means `ClusterConfig` is still mutable object since callers can change inner variables of `ClusterConfig` by updating `ClusterConfig#Builder`. For example, user can call `putSaslServerProperty` to chagne `saslServerProperties` of `ClusterConfig` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573855899 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -67,13 +69,16 @@ public class ClusterConfig { this.listenerName = listenerName; this.trustStoreFile = trustStoreFile; this.metadataVersion = metadataVersion; -this.serverProperties = copyOf(serverProperties); -this.producerProperties = copyOf(producerProperties); -this.consumerProperties = copyOf(consumerProperties); -this.adminClientProperties = copyOf(adminClientProperties); -this.saslServerProperties = copyOf(saslServerProperties); -this.saslClientProperties = copyOf(saslClientProperties); -perBrokerOverrideProperties.forEach((brokerId, props) -> this.perBrokerOverrideProperties.put(brokerId, copyOf(props))); +this.serverProperties = Collections.unmodifiableMap(serverProperties); +this.producerProperties = Collections.unmodifiableMap(producerProperties); +this.consumerProperties = Collections.unmodifiableMap(consumerProperties); +this.adminClientProperties = Collections.unmodifiableMap(adminClientProperties); +this.saslServerProperties = Collections.unmodifiableMap(saslServerProperties); +this.saslClientProperties = Collections.unmodifiableMap(saslClientProperties); +this.perBrokerOverrideProperties = Collections.unmodifiableMap( Review Comment: Pardon me, I'm not quite clear about this comment. Could you explain more ? :smiley: For convenience, this pr add something like `public Builder putServerProperty(String key, String value) ` in each configurations like `serverProperties`, `consumerProperties`. So we don't need to do deep copy in ClusterConfig.Builder. Or did you mean we need to use `setServerProperty(Map serverProperties)` instead of `putServerProperty(String key, String value)` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573852618 ## core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala: ## @@ -17,48 +17,78 @@ package kafka.server -import kafka.test.{ClusterConfig, ClusterInstance} +import kafka.test.ClusterInstance import org.apache.kafka.common.message.ApiVersionsRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.ApiVersionsRequest -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.extension.ExtendWith @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1) +@ClusterTestDefaults(brokers = 1) class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { - @BeforeEach - def setup(config: ClusterConfig): Unit = { -super.brokerPropertyOverrides(config.serverProperties()) - } - - @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( -new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), -new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), + @ClusterTests(Array( +new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( Review Comment: Filed https://issues.apache.org/jira/browse/KAFKA-16595 ## core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala: ## @@ -17,48 +17,78 @@ package kafka.server -import kafka.test.{ClusterConfig, ClusterInstance} +import kafka.test.ClusterInstance import org.apache.kafka.common.message.ApiVersionsRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.ApiVersionsRequest -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.extension.ExtendWith @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1) +@ClusterTestDefaults(brokers = 1) class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { - @BeforeEach - def setup(config: ClusterConfig): Unit = { -super.brokerPropertyOverrides(config.serverProperties()) - } - - @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( -new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), -new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), + @ClusterTests(Array( +new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( Review Comment: gentle ping @chia7712 , Filed https://issues.apache.org/jira/browse/KAFKA-16595 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573851263 ## core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala: ## @@ -17,48 +17,78 @@ package kafka.server -import kafka.test.{ClusterConfig, ClusterInstance} +import kafka.test.ClusterInstance import org.apache.kafka.common.message.ApiVersionsRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.ApiVersionsRequest -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.extension.ExtendWith @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1) +@ClusterTestDefaults(brokers = 1) class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { - @BeforeEach - def setup(config: ClusterConfig): Unit = { -super.brokerPropertyOverrides(config.serverProperties()) - } - - @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( -new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), -new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), + @ClusterTests(Array( +new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( Review Comment: OK, let's talk about this in that JIRA. What I've wondering here is to simplify the duplicate ClusterConfigProperty here. Add `ClusterTemplate` to `ClusterTests` is a way. But maybe we can have other solutions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573850549 ## core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala: ## @@ -17,48 +17,78 @@ package kafka.server -import kafka.test.{ClusterConfig, ClusterInstance} +import kafka.test.ClusterInstance import org.apache.kafka.common.message.ApiVersionsRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.ApiVersionsRequest -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.extension.ExtendWith @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1) +@ClusterTestDefaults(brokers = 1) class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { - @BeforeEach - def setup(config: ClusterConfig): Unit = { -super.brokerPropertyOverrides(config.serverProperties()) - } - - @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( -new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), -new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), + @ClusterTests(Array( +new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( Review Comment: > Currently we can't add ClusterTemplate in ClusterTests. Should we do that is this PR ? Could you file a Jira for that? Also, we can leave a TODO here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573849975 ## core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala: ## @@ -17,48 +17,78 @@ package kafka.server -import kafka.test.{ClusterConfig, ClusterInstance} +import kafka.test.ClusterInstance import org.apache.kafka.common.message.ApiVersionsRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.ApiVersionsRequest -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.extension.ExtendWith @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1) +@ClusterTestDefaults(brokers = 1) class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { - @BeforeEach - def setup(config: ClusterConfig): Unit = { -super.brokerPropertyOverrides(config.serverProperties()) - } - - @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( -new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), -new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), + @ClusterTests(Array( +new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( Review Comment: Currently we can't add `ClusterTemplate` in ClusterTests. Should we do that is this PR ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573839545 ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -290,7 +287,7 @@ public void waitForReadyBrokers() throws InterruptedException { } @Override -public void rollingBrokerRestart() { +public void rollingBrokerRestart(Optional clusterConfig) { Review Comment: As not all implementation support this method, we should remove it from interface. The callers can use `getUnderlying` to get zk instance and call that method ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -211,13 +186,36 @@ public static class Builder { private String listenerName; private File trustStoreFile; private MetadataVersion metadataVersion; -private Properties serverProperties = new Properties(); -private Properties producerProperties = new Properties(); -private Properties consumerProperties = new Properties(); -private Properties adminClientProperties = new Properties(); -private Properties saslServerProperties = new Properties(); -private Properties saslClientProperties = new Properties(); -private final Map perBrokerOverrideProperties = new HashMap<>(); +private Map serverProperties = new HashMap<>(); +private Map producerProperties = new HashMap<>(); +private Map consumerProperties = new HashMap<>(); +private Map adminClientProperties = new HashMap<>(); +private Map saslServerProperties = new HashMap<>(); +private Map saslClientProperties = new HashMap<>(); +private Map> perBrokerOverrideProperties = new HashMap<>(); + +Builder() {} + +Builder(ClusterConfig clusterConfig) { +this.type = clusterConfig.type; +this.brokers = clusterConfig.brokers; +this.controllers = clusterConfig.controllers; +this.name = clusterConfig.name; +this.autoStart = clusterConfig.autoStart; +this.securityProtocol = clusterConfig.securityProtocol; +this.listenerName = clusterConfig.listenerName; +this.trustStoreFile = clusterConfig.trustStoreFile; +this.metadataVersion = clusterConfig.metadataVersion; +this.serverProperties = new HashMap<>(clusterConfig.serverProperties); +this.producerProperties = new HashMap<>(clusterConfig.producerProperties); +this.consumerProperties = new HashMap<>(clusterConfig.consumerProperties); +this.adminClientProperties = new HashMap<>(clusterConfig.adminClientProperties); +this.saslServerProperties = new HashMap<>(clusterConfig.saslServerProperties); +this.saslClientProperties = new HashMap<>(clusterConfig.saslClientProperties); +Map> perBrokerOverrideProps = new HashMap<>(); +clusterConfig.perBrokerOverrideProperties.forEach((k, v) -> perBrokerOverrideProps.put(k, new HashMap<>(v))); +this.perBrokerOverrideProperties = perBrokerOverrideProps; Review Comment: ```java this.perBrokerOverrideProperties = clusterConfig.perBrokerOverrideProperties.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> new HashMap<>(e.getValue(; ``` ## core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala: ## @@ -18,41 +18,58 @@ package kafka.server import java.net.Socket import java.util.Collections - import kafka.api.{KafkaSasl, SaslSetup} -import kafka.test.annotation.{ClusterTest, Type} +import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, kafkaServerSaslMechanisms} +import kafka.test.annotation.{ClusterTemplate, Type} import kafka.test.junit.ClusterTestExtensions -import kafka.test.{ClusterConfig, ClusterInstance} +import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance} import kafka.utils.JaasTestUtils +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.message.SaslHandshakeRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse} import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.server.config.KafkaSecurityConfigs import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.{AfterEach, BeforeEach} import scala.jdk.CollectionConverters._ +object SaslApiVersionsRequestTest { + val kafkaClientSaslMechanism = "PLAIN" + val kafkaServerSaslMechanisms: Seq[String] = List("PLAIN") + val controlPlaneListenerName = "CONTROL_PLANE" + val securityProtocol = SecurityProto