jsancio commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2738100509


##########
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:
##########
@@ -178,6 +178,20 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     closeSasl()
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  @MethodSource(Array("getTestGroupProtocolParametersAll"))
+  def testNonDefaultKControllerDynamicConfiguration(groupProtocol: String): 
Unit = {
+    val props = new Properties
+    props.put(ServerConfigs.NUM_IO_THREADS_CONFIG, "9")
+    reconfigureServers(props, perBrokerConfig = false, 
(ServerConfigs.NUM_IO_THREADS_CONFIG, "9"))
+
+    val controller = controllerServer
+    TestUtils.retry(60000) {
+      assertNotNull(controller.controllerApisHandlerPool)
+      assertEquals(9, 
controller.controllerApisHandlerPool.threadPoolSize.get())
+    }
+  }
+

Review Comment:
   What is this testing? Why is this part of this PR/change?



##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -19,65 +19,33 @@
 
 import org.apache.kafka.metadata.util.BatchFileReader;
 import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType;
-import org.apache.kafka.metadata.util.BatchFileWriter;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
 
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
-
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
 
 /**
- * A read-only class that holds the controller bootstrap metadata. A file 
named "bootstrap.checkpoint" is used and the
- * format is the same as a KRaft snapshot.
+ * Abstraction for reading controller bootstrap metadata from disk.
  */
-public class BootstrapDirectory {
-    public static final String BINARY_BOOTSTRAP_FILENAME = 
"bootstrap.checkpoint";
-
-    private final String directoryPath;
+public interface BootstrapDirectory {
 
     /**
-     * Create a new BootstrapDirectory object.
+     * Read the bootstrap metadata from the configured location.
      *
-     * @param directoryPath     The path to the directory with the bootstrap 
file.
+     * @return the loaded {@link BootstrapMetadata}
+     * @throws Exception if the metadata cannot be read
      */
-    public BootstrapDirectory(
-        String directoryPath
-    ) {
-        this.directoryPath = Objects.requireNonNull(directoryPath);
-    }
-
-    public BootstrapMetadata read() throws Exception {
-        Path path = Paths.get(directoryPath);
-        if (!Files.isDirectory(path)) {
-            if (Files.exists(path)) {
-                throw new RuntimeException("Path " + directoryPath + " exists, 
but is not " +
-                        "a directory.");
-            } else {
-                throw new RuntimeException("No such directory as " + 
directoryPath);
-            }
-        }
-        Path binaryBootstrapPath = Paths.get(directoryPath, 
BINARY_BOOTSTRAP_FILENAME);
-        if (!Files.exists(binaryBootstrapPath)) {
-            return readFromConfiguration();
-        } else {
-            return readFromBinaryFile(binaryBootstrapPath.toString());
-        }
-    }
-
-    BootstrapMetadata readFromConfiguration() {
-        return 
BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(), "the default 
bootstrap");
-    }
+    BootstrapMetadata read() throws Exception;
 
-    BootstrapMetadata readFromBinaryFile(String binaryPath) throws Exception {
+    /**
+     * Read bootstrap metadata from the given binary file path.
+     *
+     * @param binaryPath the path to the binary bootstrap file
+     * @return the loaded {@link BootstrapMetadata}
+     * @throws Exception if the metadata cannot be read
+     */
+    default BootstrapMetadata readFromBinaryFile(String binaryPath) throws 
Exception {

Review Comment:
   Can this be done with static method in a utility package? Again, it is odd 
to have an interface that abstract the construction of BootstrapMetadata. And 
at the same time the interface has a defined method that constructs a bootstrap 
metadata that all of the implementation use. 



##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/TestBootstrapDirectory.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import java.io.FileNotFoundException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+import static 
org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
+
+/**
+ * Test-only implementation that reads bootstrap metadata from the metadata 
partition snapshot.
+ */
+public class TestBootstrapDirectory implements BootstrapDirectory {

Review Comment:
   To me that sounds like a minor distinction that could be supported by 
`LegacyBoostrapDirectory`. What do you think @mannoopj 



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -1174,35 +1174,6 @@ class KRaftClusterTest {
     }
   }
 
-  @Test
-  def testStartupWithNonDefaultKControllerDynamicConfiguration(): Unit = {

Review Comment:
   Was this moved to `DynamicBrokerReconfigurationTest`? Why is that? 
QuorumTestHarness was developed to migrate existing test from ZK to KRaft. New 
tests should use the `KafkaClusterTestKit` utility.



##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/TestBootstrapDirectory.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import java.io.FileNotFoundException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+import static 
org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
+
+/**
+ * Test-only implementation that reads bootstrap metadata from the metadata 
partition snapshot.
+ */
+public class TestBootstrapDirectory implements BootstrapDirectory {
+    private static final String BINARY_BOOTSTRAP_CHECKPOINT_FILENAME = 
"00000000000000000000-0000000000.checkpoint";

Review Comment:
   I still don't get it. Again to me it looks like the file name used could be 
parametrized.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1436,7 +1446,7 @@ private void replay(ApiMessage message, 
Optional<OffsetAndEpoch> snapshotId, lon
     /**
      * The bootstrap metadata to use for initialization if needed.
      */
-    private final BootstrapMetadata bootstrapMetadata;
+    private volatile BootstrapMetadata bootstrapMetadata;

Review Comment:
   Let's document why volatile is needed and why volatile is enough to be 
correct.



##########
raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java:
##########
@@ -57,6 +57,12 @@ public interface SnapshotReader<T> extends AutoCloseable, 
Iterator<Batch<T>> {
      */
     long lastContainedLogTimestamp();
 
+    /**
+     * Returns true if the snapshot has been committed.
+     * Uncommitted bootstrap snapshots return false.
+     */
+    boolean isCommittedSnapshot();

Review Comment:
   Thanks. I think that returning uncommitted snapshot through 
`RaftListener.handleLoadSnapshot` has a high probability of the 
user/application handling it wrong. Instead of changing the semantic of 
`handleLoadSnapshot` to return uncommitted snapshots. I think that we should 
add another callback to `RaftListener`, for example 
`handleLoadBootstrap(SnapshotReader<T>)`. This make it clearer to the user the 
semantic difference between the two snapshots.
   
   If we don't this, all KRaft application need to remember to check if the 
returned snapshot is uncommitted or committed. Which is not clear from the 
`RaftListener` interface and the documentation of that interface.



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -123,6 +123,7 @@ object StorageTool extends Logging {
     val formatter = new Formatter().
       setPrintStream(printStream).
       setNodeId(config.nodeId).
+      
setWriteBootstrapSnapshot(config.processRoles.contains(ProcessRole.ControllerRole)).

Review Comment:
   I think I am fine with this change. We should document it in KIP-1170.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to