mimaison commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1626218724


##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -45,89 +45,117 @@ import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
 
 object StorageTool extends Logging {
-  def main(args: Array[String]): Unit = {
-    try {
-      val namespace = parseArguments(args)
-      val command = namespace.getString("command")
-      val config = Option(namespace.getString("config")).flatMap(
-        p => Some(new KafkaConfig(Utils.loadProps(p))))
-      command match {
-        case "info" =>
-          val directories = configToLogDirectories(config.get)
-          val selfManagedMode = configToSelfManagedMode(config.get)
-          Exit.exit(infoCommand(System.out, selfManagedMode, directories))
-
-        case "format" =>
-          val directories = configToLogDirectories(config.get)
-          val clusterId = namespace.getString("cluster_id")
-          val metaProperties = new MetaProperties.Builder().
-            setVersion(MetaPropertiesVersion.V1).
-            setClusterId(clusterId).
-            setNodeId(config.get.nodeId).
-            build()
-          val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
-          val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
-          val releaseVersionFlagSpecified = 
namespace.getString("release_version") != null
-          if (releaseVersionFlagSpecified && specifiedFeatures != null) {
-            throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
-          }
-          val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
-          val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
-            
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-          validateMetadataVersion(metadataVersion, config)
-          // Get all other features, validate, and create records for them
-          // Use latest default for features if --release-version is not 
specified
-          generateFeatureRecords(
-            metadataRecords,
-            metadataVersion,
-            featureNamesAndLevelsMap,
-            Features.PRODUCTION_FEATURES.asScala.toList,
-            config.get.unstableFeatureVersionsEnabled,
-            releaseVersionFlagSpecified
-          )
-          
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
-            if (!metadataVersion.isScramSupported) {
-              throw new TerseFailure(s"SCRAM is only supported in 
metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.")
-            }
-            for (record <- userScramCredentialRecords) {
-              metadataRecords.append(new ApiMessageAndVersion(record, 
0.toShort))
-            }
-          })
-
-          val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, 
Some(metadataRecords), "format command")
-          val ignoreFormatted = namespace.getBoolean("ignore_formatted")
-          if (!configToSelfManagedMode(config.get)) {
-            throw new TerseFailure("The kafka configuration file appears to be 
for " +
-              "a legacy cluster. Formatting is only supported for clusters in 
KRaft mode.")
-          }
-          Exit.exit(formatCommand(System.out, directories, metaProperties, 
bootstrapMetadata,
-                                  metadataVersion,ignoreFormatted))
 
-        case "random-uuid" =>
-          System.out.println(Uuid.randomUuid)
-          Exit.exit(0)
+  /**
+   * Executes the command according to the given arguments and returns the 
appropriate exit code.
+   * @param args The command line arguments
+   * @return     The exit code
+   */
+  def runMain(args: Array[String]): Int = {

Review Comment:
   In other tools, the method that is called by `main()` is typically called 
`execute()`. I think it would make sense to use that name here too.



##########
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##########
@@ -656,5 +657,47 @@ Found problem:
       assertEquals(1, exitStatus)
     }
   }
+
+  @Test
+  def testFormatValidatesConfigForMetadataVersion(): Unit = {
+    val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(10, 
null)))
+    val args = Array("format",
+      "-c", "dummy.properties",
+      "-t", "XcZZOzUqS4yHOjhMQB6JLQ",
+      "--release-version", MetadataVersion.LATEST_PRODUCTION.toString)
+    val exitCode = 
StorageTool.runFormatCommand(StorageTool.parseArguments(args), config)
+    Mockito.verify(config, 
Mockito.times(1)).validateWithMetadataVersion(MetadataVersion.LATEST_PRODUCTION)
+    assertEquals(0, exitCode)
+  }
+
+  private def createPropsFile(properties: Properties): String = {
+    val propsFile = TestUtils.tempFile()
+    val propsStream = Files.newOutputStream(propsFile.toPath)
+    try {
+      properties.store(propsStream, "config.props")
+    } finally {
+      propsStream.close()
+    }
+    propsFile.toPath.toString
+  }
+
+  @Test
+  def testJbodSupportValidation(): Unit = {
+    def formatWith(logDirCount: Int, metadataVersion: MetadataVersion): 
Integer = {
+      val properties = TestUtils.createBrokerConfig(10, null, logDirCount = 
logDirCount)
+      
properties.remove(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)
+      StorageTool.runMain(Array("format",

Review Comment:
   We can get rid of `createPropsFile()` and reuse 
`TestUtils.tempPropertiesFile()`
   ```suggestion
   val configFile = 
TestUtils.tempPropertiesFile(properties.asInstanceOf[util.Map[String, 
String]].asScala)
   StorageTool.runMain(Array("format",
           "-c", configFile.getAbsolutePath,
           "-t", "XcZZOzUqS4yHOjhMQB6JLQ",
           "--release-version", metadataVersion.toString))
   ```



##########
core/src/main/java/kafka/server/MetadataVersionConfigValidator.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+
+public class MetadataVersionConfigValidator implements MetadataPublisher {

Review Comment:
   Sorry I'm not super familiar yet with this. Can you explain why we would 
catch the configuration error here instead of in KafkaConfig?
   



##########
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##########
@@ -656,5 +657,47 @@ Found problem:
       assertEquals(1, exitStatus)
     }
   }
+
+  @Test
+  def testFormatValidatesConfigForMetadataVersion(): Unit = {
+    val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(10, 
null)))
+    val args = Array("format",
+      "-c", "dummy.properties",
+      "-t", "XcZZOzUqS4yHOjhMQB6JLQ",
+      "--release-version", MetadataVersion.LATEST_PRODUCTION.toString)
+    val exitCode = 
StorageTool.runFormatCommand(StorageTool.parseArguments(args), config)
+    Mockito.verify(config, 
Mockito.times(1)).validateWithMetadataVersion(MetadataVersion.LATEST_PRODUCTION)
+    assertEquals(0, exitCode)
+  }
+
+  private def createPropsFile(properties: Properties): String = {
+    val propsFile = TestUtils.tempFile()
+    val propsStream = Files.newOutputStream(propsFile.toPath)
+    try {
+      properties.store(propsStream, "config.props")
+    } finally {
+      propsStream.close()
+    }
+    propsFile.toPath.toString
+  }
+
+  @Test
+  def testJbodSupportValidation(): Unit = {
+    def formatWith(logDirCount: Int, metadataVersion: MetadataVersion): 
Integer = {
+      val properties = TestUtils.createBrokerConfig(10, null, logDirCount = 
logDirCount)
+      
properties.remove(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)

Review Comment:
   It looks like we're missing a test that check when inter broker protocol 
version is set?



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -45,89 +45,117 @@ import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
 
 object StorageTool extends Logging {
-  def main(args: Array[String]): Unit = {
-    try {
-      val namespace = parseArguments(args)
-      val command = namespace.getString("command")
-      val config = Option(namespace.getString("config")).flatMap(
-        p => Some(new KafkaConfig(Utils.loadProps(p))))
-      command match {
-        case "info" =>
-          val directories = configToLogDirectories(config.get)
-          val selfManagedMode = configToSelfManagedMode(config.get)
-          Exit.exit(infoCommand(System.out, selfManagedMode, directories))
-
-        case "format" =>
-          val directories = configToLogDirectories(config.get)
-          val clusterId = namespace.getString("cluster_id")
-          val metaProperties = new MetaProperties.Builder().
-            setVersion(MetaPropertiesVersion.V1).
-            setClusterId(clusterId).
-            setNodeId(config.get.nodeId).
-            build()
-          val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
-          val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
-          val releaseVersionFlagSpecified = 
namespace.getString("release_version") != null
-          if (releaseVersionFlagSpecified && specifiedFeatures != null) {
-            throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
-          }
-          val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
-          val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
-            
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-          validateMetadataVersion(metadataVersion, config)
-          // Get all other features, validate, and create records for them
-          // Use latest default for features if --release-version is not 
specified
-          generateFeatureRecords(
-            metadataRecords,
-            metadataVersion,
-            featureNamesAndLevelsMap,
-            Features.PRODUCTION_FEATURES.asScala.toList,
-            config.get.unstableFeatureVersionsEnabled,
-            releaseVersionFlagSpecified
-          )
-          
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
-            if (!metadataVersion.isScramSupported) {
-              throw new TerseFailure(s"SCRAM is only supported in 
metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.")
-            }
-            for (record <- userScramCredentialRecords) {
-              metadataRecords.append(new ApiMessageAndVersion(record, 
0.toShort))
-            }
-          })
-
-          val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, 
Some(metadataRecords), "format command")
-          val ignoreFormatted = namespace.getBoolean("ignore_formatted")
-          if (!configToSelfManagedMode(config.get)) {
-            throw new TerseFailure("The kafka configuration file appears to be 
for " +
-              "a legacy cluster. Formatting is only supported for clusters in 
KRaft mode.")
-          }
-          Exit.exit(formatCommand(System.out, directories, metaProperties, 
bootstrapMetadata,
-                                  metadataVersion,ignoreFormatted))
 
-        case "random-uuid" =>
-          System.out.println(Uuid.randomUuid)
-          Exit.exit(0)
+  /**
+   * Executes the command according to the given arguments and returns the 
appropriate exit code.
+   * @param args The command line arguments
+   * @return     The exit code
+   */
+  def runMain(args: Array[String]): Int = {
+    val namespace = parseArguments(args)
+    val command = namespace.getString("command")
+    val config = Option(namespace.getString("config")).flatMap(
+      p => Some(new KafkaConfig(Utils.loadProps(p))))
+    command match {
+      case "info" =>
+        val directories = configToLogDirectories(config.get)
+        val selfManagedMode = configToSelfManagedMode(config.get)
+        infoCommand(System.out, selfManagedMode, directories)
+
+      case "format" =>
+        runFormatCommand(namespace, config.get)
+
+      case "random-uuid" =>
+        System.out.println(Uuid.randomUuid)
+        0
+      case _ =>
+        throw new RuntimeException(s"Unknown command $command")
+    }
+  }
 
-        case _ =>
-          throw new RuntimeException(s"Unknown command $command")
-      }
+  def main(args: Array[String]): Unit = {

Review Comment:
   Can we move `main()` to the top?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to