chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573452044


##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -139,28 +159,46 @@ public Map<String, String> nameTags() {
         return tags;
     }
 
-    public ClusterConfig copyOf() {
-        ClusterConfig copy = new ClusterConfig(type, brokers, controllers, 
name, autoStart, securityProtocol, listenerName, trustStoreFile, 
metadataVersion);
-        copy.serverProperties.putAll(serverProperties);
-        copy.producerProperties.putAll(producerProperties);
-        copy.consumerProperties.putAll(consumerProperties);
-        copy.saslServerProperties.putAll(saslServerProperties);
-        copy.saslClientProperties.putAll(saslClientProperties);
-        perBrokerOverrideProperties.forEach((brokerId, props) -> {
-            Properties propsCopy = new Properties();
-            propsCopy.putAll(props);
-            copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
-        });
-        return copy;
+    public static Builder defaultClusterBuilder() {
+        return new Builder()
+                .type(Type.ZK)
+                .brokers(1)
+                .controllers(1)
+                .autoStart(true)
+                .securityProtocol(SecurityProtocol.PLAINTEXT)
+                .metadataVersion(MetadataVersion.latestTesting());
     }
 
-    public static Builder defaultClusterBuilder() {
-        return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, 
MetadataVersion.latestTesting());
+    public static Builder clusterBuilder() {

Review Comment:
   It seems to me `builder` is good enough.



##########
core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java:
##########
@@ -349,4 +277,86 @@ public Stream<KafkaServer> servers() {
             return 
JavaConverters.asJavaCollection(clusterReference.get().servers()).stream();
         }
     }
+
+    private static class ClusterConfigurableIntegrationHarness extends 
IntegrationTestHarness {
+        private ClusterConfig clusterConfig;
+
+        public ClusterConfigurableIntegrationHarness(ClusterConfig 
clusterConfig) {

Review Comment:
   `public` -> `private`



##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -139,28 +159,46 @@ public Map<String, String> nameTags() {
         return tags;
     }
 
-    public ClusterConfig copyOf() {
-        ClusterConfig copy = new ClusterConfig(type, brokers, controllers, 
name, autoStart, securityProtocol, listenerName, trustStoreFile, 
metadataVersion);
-        copy.serverProperties.putAll(serverProperties);
-        copy.producerProperties.putAll(producerProperties);
-        copy.consumerProperties.putAll(consumerProperties);
-        copy.saslServerProperties.putAll(saslServerProperties);
-        copy.saslClientProperties.putAll(saslClientProperties);
-        perBrokerOverrideProperties.forEach((brokerId, props) -> {
-            Properties propsCopy = new Properties();
-            propsCopy.putAll(props);
-            copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
-        });
-        return copy;
+    public static Builder defaultClusterBuilder() {
+        return new Builder()
+                .type(Type.ZK)
+                .brokers(1)
+                .controllers(1)
+                .autoStart(true)
+                .securityProtocol(SecurityProtocol.PLAINTEXT)
+                .metadataVersion(MetadataVersion.latestTesting());
     }
 
-    public static Builder defaultClusterBuilder() {
-        return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, 
MetadataVersion.latestTesting());
+    public static Builder clusterBuilder() {
+        return new Builder();
     }
 
-    public static Builder clusterBuilder(Type type, int brokers, int 
controllers, boolean autoStart,
-                                         SecurityProtocol securityProtocol, 
MetadataVersion metadataVersion) {
-        return new Builder(type, brokers, controllers, autoStart, 
securityProtocol, metadataVersion);
+    public static Builder clusterBuilder(ClusterConfig clusterConfig) {
+        ClusterConfig.Builder builder = new Builder()
+                .type(clusterConfig.type)
+                .brokers(clusterConfig.brokers)
+                .controllers(clusterConfig.controllers)
+                .name(clusterConfig.name)
+                .autoStart(clusterConfig.autoStart)
+                .securityProtocol(clusterConfig.securityProtocol)
+                .listenerName(clusterConfig.listenerName)
+                .trustStoreFile(clusterConfig.trustStoreFile)
+                .metadataVersion(clusterConfig.metadataVersion);
+        builder.serverProperties = copyOf(clusterConfig.serverProperties);
+        builder.producerProperties = copyOf(clusterConfig.producerProperties);
+        builder.consumerProperties = copyOf(clusterConfig.consumerProperties);
+        builder.adminClientProperties = 
copyOf(clusterConfig.adminClientProperties);
+        builder.saslServerProperties = 
copyOf(clusterConfig.saslServerProperties);
+        builder.saslClientProperties = 
copyOf(clusterConfig.saslClientProperties);
+        clusterConfig.perBrokerOverrideProperties.forEach((brokerId, props) ->
+                builder.perBrokerOverrideProperties.put(brokerId, 
copyOf(props)));
+        return builder;
+    }
+
+    private static Properties copyOf(Properties properties) {

Review Comment:
   Instead of creating copy, we can change the type of inner variable from 
`Properties` to `Map` to return unmodifiable view.



##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -139,28 +159,46 @@ public Map<String, String> nameTags() {
         return tags;
     }
 
-    public ClusterConfig copyOf() {
-        ClusterConfig copy = new ClusterConfig(type, brokers, controllers, 
name, autoStart, securityProtocol, listenerName, trustStoreFile, 
metadataVersion);
-        copy.serverProperties.putAll(serverProperties);
-        copy.producerProperties.putAll(producerProperties);
-        copy.consumerProperties.putAll(consumerProperties);
-        copy.saslServerProperties.putAll(saslServerProperties);
-        copy.saslClientProperties.putAll(saslClientProperties);
-        perBrokerOverrideProperties.forEach((brokerId, props) -> {
-            Properties propsCopy = new Properties();
-            propsCopy.putAll(props);
-            copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
-        });
-        return copy;
+    public static Builder defaultClusterBuilder() {
+        return new Builder()
+                .type(Type.ZK)
+                .brokers(1)
+                .controllers(1)
+                .autoStart(true)
+                .securityProtocol(SecurityProtocol.PLAINTEXT)
+                .metadataVersion(MetadataVersion.latestTesting());
     }
 
-    public static Builder defaultClusterBuilder() {
-        return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, 
MetadataVersion.latestTesting());
+    public static Builder clusterBuilder() {
+        return new Builder();
     }
 
-    public static Builder clusterBuilder(Type type, int brokers, int 
controllers, boolean autoStart,
-                                         SecurityProtocol securityProtocol, 
MetadataVersion metadataVersion) {
-        return new Builder(type, brokers, controllers, autoStart, 
securityProtocol, metadataVersion);
+    public static Builder clusterBuilder(ClusterConfig clusterConfig) {

Review Comment:
   Could we add a constructor to Builder to accept a `ClusterConfig` to 
initialize a builder?



##########
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##########
@@ -17,48 +17,78 @@
 
 package kafka.server
 
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.extension.ExtendWith
 
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
+@ClusterTestDefaults(brokers = 1)
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
 
-  @BeforeEach
-  def setup(config: ClusterConfig): Unit = {
-    super.brokerPropertyOverrides(config.serverProperties())
-  }
-
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties 
= Array(
-    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-    new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "true"),
+  @ClusterTests(Array(
+    new ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(

Review Comment:
   Pardon me. Why we need those changes?



##########
core/src/test/java/kafka/testkit/BrokerNode.java:
##########
@@ -66,11 +68,27 @@ public Builder setNumLogDirectories(int numLogDirectories) {
             return this;
         }
 
-        public BrokerNode build(
-            String baseDirectory,
-            Uuid clusterId,
-            boolean combined
-        ) {
+        public Builder setClusterId(Uuid clusterId) {
+            this.clusterId = clusterId;
+            return this;
+        }
+
+        public Builder setBaseDirectory(String baseDirectory) {
+            this.baseDirectory = baseDirectory;
+            return this;
+        }
+
+        public Builder setCombined(boolean combined) {
+            this.combined = combined;
+            return this;
+        }
+
+        public Builder setPropertyOverrides(Map<String, String> 
propertyOverrides) {
+            this.propertyOverrides = propertyOverrides;

Review Comment:
   We should do a deep copy to avoid modification from caller in the future.



##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -139,28 +159,46 @@ public Map<String, String> nameTags() {
         return tags;
     }
 
-    public ClusterConfig copyOf() {
-        ClusterConfig copy = new ClusterConfig(type, brokers, controllers, 
name, autoStart, securityProtocol, listenerName, trustStoreFile, 
metadataVersion);
-        copy.serverProperties.putAll(serverProperties);
-        copy.producerProperties.putAll(producerProperties);
-        copy.consumerProperties.putAll(consumerProperties);
-        copy.saslServerProperties.putAll(saslServerProperties);
-        copy.saslClientProperties.putAll(saslClientProperties);
-        perBrokerOverrideProperties.forEach((brokerId, props) -> {
-            Properties propsCopy = new Properties();
-            propsCopy.putAll(props);
-            copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
-        });
-        return copy;
+    public static Builder defaultClusterBuilder() {
+        return new Builder()
+                .type(Type.ZK)
+                .brokers(1)
+                .controllers(1)
+                .autoStart(true)
+                .securityProtocol(SecurityProtocol.PLAINTEXT)
+                .metadataVersion(MetadataVersion.latestTesting());
     }
 
-    public static Builder defaultClusterBuilder() {
-        return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, 
MetadataVersion.latestTesting());
+    public static Builder clusterBuilder() {
+        return new Builder();
     }
 
-    public static Builder clusterBuilder(Type type, int brokers, int 
controllers, boolean autoStart,
-                                         SecurityProtocol securityProtocol, 
MetadataVersion metadataVersion) {
-        return new Builder(type, brokers, controllers, autoStart, 
securityProtocol, metadataVersion);
+    public static Builder clusterBuilder(ClusterConfig clusterConfig) {
+        ClusterConfig.Builder builder = new Builder()
+                .type(clusterConfig.type)
+                .brokers(clusterConfig.brokers)
+                .controllers(clusterConfig.controllers)
+                .name(clusterConfig.name)
+                .autoStart(clusterConfig.autoStart)
+                .securityProtocol(clusterConfig.securityProtocol)
+                .listenerName(clusterConfig.listenerName)
+                .trustStoreFile(clusterConfig.trustStoreFile)
+                .metadataVersion(clusterConfig.metadataVersion);
+        builder.serverProperties = copyOf(clusterConfig.serverProperties);

Review Comment:
   why we don't use setter here?



##########
core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java:
##########
@@ -96,83 +98,7 @@ public List<Extension> getAdditionalExtensions() {
                 // have run. This allows tests to set up external dependencies 
like ZK, MiniKDC, etc.
                 // However, since we cannot create this instance until we are 
inside the test invocation, we have
                 // to use a container class (AtomicReference) to provide this 
cluster object to the test itself
-
-                // This is what tests normally extend from to start a cluster, 
here we create it anonymously and

Review Comment:
   Could we keep this comment?



##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -139,28 +159,46 @@ public Map<String, String> nameTags() {
         return tags;
     }
 
-    public ClusterConfig copyOf() {
-        ClusterConfig copy = new ClusterConfig(type, brokers, controllers, 
name, autoStart, securityProtocol, listenerName, trustStoreFile, 
metadataVersion);
-        copy.serverProperties.putAll(serverProperties);
-        copy.producerProperties.putAll(producerProperties);
-        copy.consumerProperties.putAll(consumerProperties);
-        copy.saslServerProperties.putAll(saslServerProperties);
-        copy.saslClientProperties.putAll(saslClientProperties);
-        perBrokerOverrideProperties.forEach((brokerId, props) -> {
-            Properties propsCopy = new Properties();
-            propsCopy.putAll(props);
-            copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
-        });
-        return copy;
+    public static Builder defaultClusterBuilder() {
+        return new Builder()
+                .type(Type.ZK)
+                .brokers(1)
+                .controllers(1)
+                .autoStart(true)
+                .securityProtocol(SecurityProtocol.PLAINTEXT)
+                .metadataVersion(MetadataVersion.latestTesting());
     }
 
-    public static Builder defaultClusterBuilder() {
-        return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, 
MetadataVersion.latestTesting());
+    public static Builder clusterBuilder() {

Review Comment:
   BTW, could we apply this pattern to `BrokerNode` and `ControllerNode` that 
we encourage users to create builder by static method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to