chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573452044
########## core/src/test/java/kafka/test/ClusterConfig.java: ########## @@ -139,28 +159,46 @@ public Map<String, String> nameTags() { return tags; } - public ClusterConfig copyOf() { - ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion); - copy.serverProperties.putAll(serverProperties); - copy.producerProperties.putAll(producerProperties); - copy.consumerProperties.putAll(consumerProperties); - copy.saslServerProperties.putAll(saslServerProperties); - copy.saslClientProperties.putAll(saslClientProperties); - perBrokerOverrideProperties.forEach((brokerId, props) -> { - Properties propsCopy = new Properties(); - propsCopy.putAll(props); - copy.perBrokerOverrideProperties.put(brokerId, propsCopy); - }); - return copy; + public static Builder defaultClusterBuilder() { + return new Builder() + .type(Type.ZK) + .brokers(1) + .controllers(1) + .autoStart(true) + .securityProtocol(SecurityProtocol.PLAINTEXT) + .metadataVersion(MetadataVersion.latestTesting()); } - public static Builder defaultClusterBuilder() { - return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latestTesting()); + public static Builder clusterBuilder() { Review Comment: It seems to me `builder` is good enough. ########## core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java: ########## @@ -349,4 +277,86 @@ public Stream<KafkaServer> servers() { return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream(); } } + + private static class ClusterConfigurableIntegrationHarness extends IntegrationTestHarness { + private ClusterConfig clusterConfig; + + public ClusterConfigurableIntegrationHarness(ClusterConfig clusterConfig) { Review Comment: `public` -> `private` ########## core/src/test/java/kafka/test/ClusterConfig.java: ########## @@ -139,28 +159,46 @@ public Map<String, String> nameTags() { return tags; } - public ClusterConfig copyOf() { - ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion); - copy.serverProperties.putAll(serverProperties); - copy.producerProperties.putAll(producerProperties); - copy.consumerProperties.putAll(consumerProperties); - copy.saslServerProperties.putAll(saslServerProperties); - copy.saslClientProperties.putAll(saslClientProperties); - perBrokerOverrideProperties.forEach((brokerId, props) -> { - Properties propsCopy = new Properties(); - propsCopy.putAll(props); - copy.perBrokerOverrideProperties.put(brokerId, propsCopy); - }); - return copy; + public static Builder defaultClusterBuilder() { + return new Builder() + .type(Type.ZK) + .brokers(1) + .controllers(1) + .autoStart(true) + .securityProtocol(SecurityProtocol.PLAINTEXT) + .metadataVersion(MetadataVersion.latestTesting()); } - public static Builder defaultClusterBuilder() { - return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latestTesting()); + public static Builder clusterBuilder() { + return new Builder(); } - public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, - SecurityProtocol securityProtocol, MetadataVersion metadataVersion) { - return new Builder(type, brokers, controllers, autoStart, securityProtocol, metadataVersion); + public static Builder clusterBuilder(ClusterConfig clusterConfig) { + ClusterConfig.Builder builder = new Builder() + .type(clusterConfig.type) + .brokers(clusterConfig.brokers) + .controllers(clusterConfig.controllers) + .name(clusterConfig.name) + .autoStart(clusterConfig.autoStart) + .securityProtocol(clusterConfig.securityProtocol) + .listenerName(clusterConfig.listenerName) + .trustStoreFile(clusterConfig.trustStoreFile) + .metadataVersion(clusterConfig.metadataVersion); + builder.serverProperties = copyOf(clusterConfig.serverProperties); + builder.producerProperties = copyOf(clusterConfig.producerProperties); + builder.consumerProperties = copyOf(clusterConfig.consumerProperties); + builder.adminClientProperties = copyOf(clusterConfig.adminClientProperties); + builder.saslServerProperties = copyOf(clusterConfig.saslServerProperties); + builder.saslClientProperties = copyOf(clusterConfig.saslClientProperties); + clusterConfig.perBrokerOverrideProperties.forEach((brokerId, props) -> + builder.perBrokerOverrideProperties.put(brokerId, copyOf(props))); + return builder; + } + + private static Properties copyOf(Properties properties) { Review Comment: Instead of creating copy, we can change the type of inner variable from `Properties` to `Map` to return unmodifiable view. ########## core/src/test/java/kafka/test/ClusterConfig.java: ########## @@ -139,28 +159,46 @@ public Map<String, String> nameTags() { return tags; } - public ClusterConfig copyOf() { - ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion); - copy.serverProperties.putAll(serverProperties); - copy.producerProperties.putAll(producerProperties); - copy.consumerProperties.putAll(consumerProperties); - copy.saslServerProperties.putAll(saslServerProperties); - copy.saslClientProperties.putAll(saslClientProperties); - perBrokerOverrideProperties.forEach((brokerId, props) -> { - Properties propsCopy = new Properties(); - propsCopy.putAll(props); - copy.perBrokerOverrideProperties.put(brokerId, propsCopy); - }); - return copy; + public static Builder defaultClusterBuilder() { + return new Builder() + .type(Type.ZK) + .brokers(1) + .controllers(1) + .autoStart(true) + .securityProtocol(SecurityProtocol.PLAINTEXT) + .metadataVersion(MetadataVersion.latestTesting()); } - public static Builder defaultClusterBuilder() { - return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latestTesting()); + public static Builder clusterBuilder() { + return new Builder(); } - public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, - SecurityProtocol securityProtocol, MetadataVersion metadataVersion) { - return new Builder(type, brokers, controllers, autoStart, securityProtocol, metadataVersion); + public static Builder clusterBuilder(ClusterConfig clusterConfig) { Review Comment: Could we add a constructor to Builder to accept a `ClusterConfig` to initialize a builder? ########## core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala: ########## @@ -17,48 +17,78 @@ package kafka.server -import kafka.test.{ClusterConfig, ClusterInstance} +import kafka.test.ClusterInstance import org.apache.kafka.common.message.ApiVersionsRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.ApiVersionsRequest -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.extension.ExtendWith @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1) +@ClusterTestDefaults(brokers = 1) class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { - @BeforeEach - def setup(config: ClusterConfig): Unit = { - super.brokerPropertyOverrides(config.serverProperties()) - } - - @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( - new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), - new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), + @ClusterTests(Array( + new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( Review Comment: Pardon me. Why we need those changes? ########## core/src/test/java/kafka/testkit/BrokerNode.java: ########## @@ -66,11 +68,27 @@ public Builder setNumLogDirectories(int numLogDirectories) { return this; } - public BrokerNode build( - String baseDirectory, - Uuid clusterId, - boolean combined - ) { + public Builder setClusterId(Uuid clusterId) { + this.clusterId = clusterId; + return this; + } + + public Builder setBaseDirectory(String baseDirectory) { + this.baseDirectory = baseDirectory; + return this; + } + + public Builder setCombined(boolean combined) { + this.combined = combined; + return this; + } + + public Builder setPropertyOverrides(Map<String, String> propertyOverrides) { + this.propertyOverrides = propertyOverrides; Review Comment: We should do a deep copy to avoid modification from caller in the future. ########## core/src/test/java/kafka/test/ClusterConfig.java: ########## @@ -139,28 +159,46 @@ public Map<String, String> nameTags() { return tags; } - public ClusterConfig copyOf() { - ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion); - copy.serverProperties.putAll(serverProperties); - copy.producerProperties.putAll(producerProperties); - copy.consumerProperties.putAll(consumerProperties); - copy.saslServerProperties.putAll(saslServerProperties); - copy.saslClientProperties.putAll(saslClientProperties); - perBrokerOverrideProperties.forEach((brokerId, props) -> { - Properties propsCopy = new Properties(); - propsCopy.putAll(props); - copy.perBrokerOverrideProperties.put(brokerId, propsCopy); - }); - return copy; + public static Builder defaultClusterBuilder() { + return new Builder() + .type(Type.ZK) + .brokers(1) + .controllers(1) + .autoStart(true) + .securityProtocol(SecurityProtocol.PLAINTEXT) + .metadataVersion(MetadataVersion.latestTesting()); } - public static Builder defaultClusterBuilder() { - return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latestTesting()); + public static Builder clusterBuilder() { + return new Builder(); } - public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, - SecurityProtocol securityProtocol, MetadataVersion metadataVersion) { - return new Builder(type, brokers, controllers, autoStart, securityProtocol, metadataVersion); + public static Builder clusterBuilder(ClusterConfig clusterConfig) { + ClusterConfig.Builder builder = new Builder() + .type(clusterConfig.type) + .brokers(clusterConfig.brokers) + .controllers(clusterConfig.controllers) + .name(clusterConfig.name) + .autoStart(clusterConfig.autoStart) + .securityProtocol(clusterConfig.securityProtocol) + .listenerName(clusterConfig.listenerName) + .trustStoreFile(clusterConfig.trustStoreFile) + .metadataVersion(clusterConfig.metadataVersion); + builder.serverProperties = copyOf(clusterConfig.serverProperties); Review Comment: why we don't use setter here? ########## core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java: ########## @@ -96,83 +98,7 @@ public List<Extension> getAdditionalExtensions() { // have run. This allows tests to set up external dependencies like ZK, MiniKDC, etc. // However, since we cannot create this instance until we are inside the test invocation, we have // to use a container class (AtomicReference) to provide this cluster object to the test itself - - // This is what tests normally extend from to start a cluster, here we create it anonymously and Review Comment: Could we keep this comment? ########## core/src/test/java/kafka/test/ClusterConfig.java: ########## @@ -139,28 +159,46 @@ public Map<String, String> nameTags() { return tags; } - public ClusterConfig copyOf() { - ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion); - copy.serverProperties.putAll(serverProperties); - copy.producerProperties.putAll(producerProperties); - copy.consumerProperties.putAll(consumerProperties); - copy.saslServerProperties.putAll(saslServerProperties); - copy.saslClientProperties.putAll(saslClientProperties); - perBrokerOverrideProperties.forEach((brokerId, props) -> { - Properties propsCopy = new Properties(); - propsCopy.putAll(props); - copy.perBrokerOverrideProperties.put(brokerId, propsCopy); - }); - return copy; + public static Builder defaultClusterBuilder() { + return new Builder() + .type(Type.ZK) + .brokers(1) + .controllers(1) + .autoStart(true) + .securityProtocol(SecurityProtocol.PLAINTEXT) + .metadataVersion(MetadataVersion.latestTesting()); } - public static Builder defaultClusterBuilder() { - return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latestTesting()); + public static Builder clusterBuilder() { Review Comment: BTW, could we apply this pattern to `BrokerNode` and `ControllerNode` that we encourage users to create builder by static method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org