This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7553d3f562f KAFKA-14593: Move LeaderElectionCommand to tools (#13204)
7553d3f562f is described below

commit 7553d3f562f3af6c7f9b062b9220bcad80b00478
Author: Omnia G.H Ibrahim <o.g.h.ibra...@gmail.com>
AuthorDate: Tue Oct 3 10:59:56 2023 +0100

    KAFKA-14593: Move LeaderElectionCommand to tools (#13204)
    
    
    Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Federico Valeri 
<fedeval...@gmail.com>
---
 bin/kafka-leader-election.sh                       |   2 +-
 bin/windows/kafka-leader-election.bat              |   2 +-
 build.gradle                                       |   8 +-
 .../scala/kafka/admin/LeaderElectionCommand.scala  | 289 ----------------
 .../admin/LeaderElectionCommandErrorTest.scala     |  97 ------
 .../kafka/admin/LeaderElectionCommandTest.scala    | 270 ---------------
 settings.gradle                                    |   1 +
 .../apache/kafka/tools/LeaderElectionCommand.java  | 376 +++++++++++++++++++++
 .../tools/LeaderElectionCommandErrorTest.java      |  89 +++++
 .../kafka/tools/LeaderElectionCommandTest.java     | 298 ++++++++++++++++
 10 files changed, 770 insertions(+), 662 deletions(-)

diff --git a/bin/kafka-leader-election.sh b/bin/kafka-leader-election.sh
index 88baef398de..49b3cf5c61a 100755
--- a/bin/kafka-leader-election.sh
+++ b/bin/kafka-leader-election.sh
@@ -14,4 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-exec $(dirname $0)/kafka-run-class.sh kafka.admin.LeaderElectionCommand "$@"
+exec $(dirname $0)/kafka-run-class.sh 
org.apache.kafka.tools.LeaderElectionCommand "$@"
diff --git a/bin/windows/kafka-leader-election.bat 
b/bin/windows/kafka-leader-election.bat
index 0432a99b6e4..92e03dfc63f 100644
--- a/bin/windows/kafka-leader-election.bat
+++ b/bin/windows/kafka-leader-election.bat
@@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied.
 rem See the License for the specific language governing permissions and
 rem limitations under the License.
 
-"%~dp0kafka-run-class.bat" kafka.admin.LeaderElectionCommand %*
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.LeaderElectionCommand %*
diff --git a/build.gradle b/build.gradle
index ffdbbf1ace4..82530e05c70 100644
--- a/build.gradle
+++ b/build.gradle
@@ -873,7 +873,7 @@ project(':core') {
     implementation project(':server-common')
     implementation project(':group-coordinator')
     implementation project(':metadata')
-    implementation project(':storage:api')
+    implementation project(':storage:storage-api')
     implementation project(':tools:tools-api')
     implementation project(':raft')
     implementation project(':storage')
@@ -915,7 +915,7 @@ project(':core') {
     testImplementation project(':metadata').sourceSets.test.output
     testImplementation project(':raft').sourceSets.test.output
     testImplementation project(':server-common').sourceSets.test.output
-    testImplementation project(':storage:api').sourceSets.test.output
+    testImplementation project(':storage:storage-api').sourceSets.test.output
     testImplementation libs.bcpkix
     testImplementation libs.mockitoCore
     testImplementation(libs.apacheda) {
@@ -1638,7 +1638,7 @@ project(':server-common') {
   }
 }
 
-project(':storage:api') {
+project(':storage:storage-api') {
   archivesBaseName = "kafka-storage-api"
 
   dependencies {
@@ -1714,7 +1714,7 @@ project(':storage') {
   }
 
   dependencies {
-    implementation project(':storage:api')
+    implementation project(':storage:storage-api')
     implementation project(':server-common')
     implementation project(':clients')
     implementation libs.caffeine
diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala 
b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
deleted file mode 100644
index 868c54916e9..00000000000
--- a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
+++ /dev/null
@@ -1,289 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.admin
-
-import java.util.Properties
-import java.util.concurrent.ExecutionException
-import joptsimple.util.EnumConverter
-import kafka.utils.CoreUtils
-import kafka.utils.Implicits._
-import kafka.utils.Json
-import kafka.utils.Logging
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
-import org.apache.kafka.common.ElectionType
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.ClusterAuthorizationException
-import org.apache.kafka.common.errors.ElectionNotNeededException
-import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.common.{AdminCommandFailedException, 
AdminOperationException}
-import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
-
-import scala.jdk.CollectionConverters._
-import scala.collection.mutable
-import scala.concurrent.duration._
-
-object LeaderElectionCommand extends Logging {
-  def main(args: Array[String]): Unit = {
-    run(args, 30.second)
-  }
-
-  def run(args: Array[String], timeout: Duration): Unit = {
-    val commandOptions = new LeaderElectionCommandOptions(args)
-    CommandLineUtils.maybePrintHelpOrVersion(
-      commandOptions,
-      "This tool attempts to elect a new leader for a set of topic partitions. 
The type of elections supported are preferred replicas and unclean replicas."
-    )
-
-    validate(commandOptions)
-
-    val electionType = 
commandOptions.options.valueOf(commandOptions.electionType)
-
-    val jsonFileTopicPartitions = 
Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
path  =>
-      parseReplicaElectionData(Utils.readFileAsString(path))
-    }
-
-    val singleTopicPartition = (
-      Option(commandOptions.options.valueOf(commandOptions.topic)),
-      Option(commandOptions.options.valueOf(commandOptions.partition))
-    ) match {
-      case (Some(topic), Some(partition)) => Some(Set(new 
TopicPartition(topic, partition)))
-      case _ => None
-    }
-
-    /* Note: No need to look at --all-topic-partitions as we want this to be 
None if it is use.
-     * The validate function should be checking that this option is required 
if the --topic and --path-to-json-file
-     * are not specified.
-     */
-    val topicPartitions = jsonFileTopicPartitions.orElse(singleTopicPartition)
-
-    val adminClient = {
-      val props = 
Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map { 
config =>
-        Utils.loadProps(config)
-      }.getOrElse(new Properties())
-
-      props.setProperty(
-        AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
-        commandOptions.options.valueOf(commandOptions.bootstrapServer)
-      )
-      props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
-      props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis / 2).toString)
-
-      Admin.create(props)
-    }
-
-    try {
-      electLeaders(adminClient, electionType, topicPartitions)
-    } finally {
-      adminClient.close()
-    }
-  }
-
-  private[this] def parseReplicaElectionData(jsonString: String): 
Set[TopicPartition] = {
-    Json.parseFull(jsonString) match {
-      case Some(js) =>
-        js.asJsonObject.get("partitions") match {
-          case Some(partitionsList) =>
-            val partitionsRaw = 
partitionsList.asJsonArray.iterator.map(_.asJsonObject)
-            val partitions = partitionsRaw.map { p =>
-              val topic = p("topic").to[String]
-              val partition = p("partition").to[Int]
-              new TopicPartition(topic, partition)
-            }.toBuffer
-            val duplicatePartitions = CoreUtils.duplicates(partitions)
-            if (duplicatePartitions.nonEmpty) {
-              throw new AdminOperationException(
-                s"Replica election data contains duplicate partitions: 
${duplicatePartitions.mkString(",")}"
-              )
-            }
-            partitions.toSet
-          case None => throw new AdminOperationException("Replica election 
data is missing \"partitions\" field")
-        }
-      case None => throw new AdminOperationException("Replica election data is 
empty")
-    }
-  }
-
-  private[this] def electLeaders(
-    client: Admin,
-    electionType: ElectionType,
-    topicPartitions: Option[Set[TopicPartition]]
-  ): Unit = {
-    val electionResults = try {
-      val partitions = topicPartitions.map(_.asJava).orNull
-      debug(s"Calling AdminClient.electLeaders($electionType, $partitions)")
-      client.electLeaders(electionType, partitions).partitions.get.asScala
-    } catch {
-      case e: ExecutionException =>
-        e.getCause match {
-          case cause: TimeoutException =>
-            val message = "Timeout waiting for election results"
-            println(message)
-            throw new AdminCommandFailedException(message, cause)
-          case cause: ClusterAuthorizationException =>
-            val message = "Not authorized to perform leader election"
-            println(message)
-            throw new AdminCommandFailedException(message, cause)
-          case _ =>
-            throw e
-        }
-      case e: Throwable =>
-        println("Error while making request")
-        throw e
-    }
-
-    val succeeded = mutable.Set.empty[TopicPartition]
-    val noop = mutable.Set.empty[TopicPartition]
-    val failed = mutable.Map.empty[TopicPartition, Throwable]
-
-    electionResults.foreach[Unit] { case (topicPartition, error) =>
-      if (error.isPresent) {
-        error.get match {
-          case _: ElectionNotNeededException => noop += topicPartition
-          case _ => failed += topicPartition -> error.get
-        }
-      } else {
-        succeeded += topicPartition
-      }
-    }
-
-    if (succeeded.nonEmpty) {
-      val partitions = succeeded.mkString(", ")
-      println(s"Successfully completed leader election ($electionType) for 
partitions $partitions")
-    }
-
-    if (noop.nonEmpty) {
-      val partitions = noop.mkString(", ")
-      println(s"Valid replica already elected for partitions $partitions")
-    }
-
-    if (failed.nonEmpty) {
-      val rootException = new AdminCommandFailedException(s"${failed.size} 
replica(s) could not be elected")
-      failed.forKeyValue { (topicPartition, exception) =>
-        println(s"Error completing leader election ($electionType) for 
partition: $topicPartition: $exception")
-        rootException.addSuppressed(exception)
-      }
-      throw rootException
-    }
-  }
-
-  private[this] def validate(commandOptions: LeaderElectionCommandOptions): 
Unit = {
-    // required options: --bootstrap-server and --election-type
-    var missingOptions = List.empty[String]
-    if (!commandOptions.options.has(commandOptions.bootstrapServer)) {
-      missingOptions = commandOptions.bootstrapServer.options().get(0) :: 
missingOptions
-    }
-
-    if (!commandOptions.options.has(commandOptions.electionType)) {
-      missingOptions = commandOptions.electionType.options().get(0) :: 
missingOptions
-    }
-
-    if (missingOptions.nonEmpty) {
-      throw new AdminCommandFailedException(s"Missing required option(s): 
${missingOptions.mkString(", ")}")
-    }
-
-    // One and only one is required: --topic, --all-topic-partitions or 
--path-to-json-file
-    val mutuallyExclusiveOptions = Seq(
-      commandOptions.topic,
-      commandOptions.allTopicPartitions,
-      commandOptions.pathToJsonFile
-    )
-
-    mutuallyExclusiveOptions.count(commandOptions.options.has) match {
-      case 1 => // This is the only correct configuration, don't throw an 
exception
-      case _ =>
-        throw new AdminCommandFailedException(
-          "One and only one of the following options is required: " +
-          s"${mutuallyExclusiveOptions.map(_.options.get(0)).mkString(", ")}"
-        )
-    }
-
-    // --partition if and only if --topic is used
-    (
-      commandOptions.options.has(commandOptions.topic),
-      commandOptions.options.has(commandOptions.partition)
-    ) match {
-      case (true, false) =>
-        throw new AdminCommandFailedException(
-          s"Missing required option(s): 
${commandOptions.partition.options.get(0)}"
-        )
-      case (false, true) =>
-        throw new AdminCommandFailedException(
-          s"Option ${commandOptions.partition.options.get(0)} is only allowed 
if " +
-          s"${commandOptions.topic.options.get(0)} is used"
-        )
-      case _ => // Ignore; we have a valid configuration
-    }
-  }
-}
-
-private final class LeaderElectionCommandOptions(args: Array[String]) extends 
CommandDefaultOptions(args) {
-  val bootstrapServer = parser
-    .accepts(
-      "bootstrap-server",
-      "A hostname and port for the broker to connect to, in the form 
host:port. Multiple comma separated URLs can be given. REQUIRED.")
-    .withRequiredArg
-    .describedAs("host:port")
-    .ofType(classOf[String])
-  val adminClientConfig = parser
-    .accepts(
-      "admin.config",
-      "Configuration properties files to pass to the admin client")
-    .withRequiredArg
-    .describedAs("config file")
-    .ofType(classOf[String])
-
-  val pathToJsonFile = parser
-    .accepts(
-      "path-to-json-file",
-      "The JSON file with the list  of partition for which leader elections 
should be performed. This is an example format. 
\n{\"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": 
\"foobar\", \"partition\": 2}]\n}\nNot allowed if --all-topic-partitions or 
--topic flags are specified.")
-    .withRequiredArg
-    .describedAs("Path to JSON file")
-    .ofType(classOf[String])
-
-  val topic = parser
-    .accepts(
-      "topic",
-      "Name of topic for which to perform an election. Not allowed if 
--path-to-json-file or --all-topic-partitions is specified.")
-    .withRequiredArg
-    .describedAs("topic name")
-    .ofType(classOf[String])
-
-  val partition = parser
-    .accepts(
-      "partition",
-      "Partition id for which to perform an election. REQUIRED if --topic is 
specified.")
-    .withRequiredArg
-    .describedAs("partition id")
-    .ofType(classOf[Integer])
-
-  val allTopicPartitions = parser
-    .accepts(
-      "all-topic-partitions",
-      "Perform election on all of the eligible topic partitions based on the 
type of election (see the --election-type flag). Not allowed if --topic or 
--path-to-json-file is specified.")
-
-  val electionType = parser
-    .accepts(
-      "election-type",
-      "Type of election to attempt. Possible values are \"preferred\" for 
preferred leader election or \"unclean\" for unclean leader election. If 
preferred election is selection, the election is only performed if the current 
leader is not the preferred leader for the topic partition. If unclean election 
is selected, the election is only performed if there are no leader for the 
topic partition. REQUIRED.")
-    .withRequiredArg
-    .describedAs("election type")
-    .withValuesConvertedBy(ElectionTypeConverter)
-
-  options = parser.parse(args: _*)
-}
-
-final object ElectionTypeConverter extends 
EnumConverter[ElectionType](classOf[ElectionType]) { }
diff --git 
a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala 
b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala
deleted file mode 100644
index 6d36120b136..00000000000
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.admin
-
-import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.server.common.AdminCommandFailedException
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-import scala.concurrent.duration._
-
-/**
- * For some error cases, we can save a little build time by avoiding the 
overhead for
- * cluster creation and cleanup because the command is expected to fail 
immediately.
- */
-class LeaderElectionCommandErrorTest {
-
-  @Test
-  def testTopicWithoutPartition(): Unit = {
-    val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
-      Array(
-        "--bootstrap-server", "nohost:9092",
-        "--election-type", "unclean",
-        "--topic", "some-topic"
-      )
-    ))
-    assertTrue(e.getMessage.startsWith("Missing required option(s)"))
-    assertTrue(e.getMessage.contains(" partition"))
-  }
-
-  @Test
-  def testPartitionWithoutTopic(): Unit = {
-    val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
-      Array(
-        "--bootstrap-server", "nohost:9092",
-        "--election-type", "unclean",
-        "--all-topic-partitions",
-        "--partition", "0"
-      )
-    ))
-    assertEquals("Option partition is only allowed if topic is used", 
e.getMessage)
-  }
-
-  @Test
-  def testMissingElectionType(): Unit = {
-    val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
-      Array(
-        "--bootstrap-server", "nohost:9092",
-        "--topic", "some-topic",
-        "--partition", "0"
-      )
-    ))
-    assertTrue(e.getMessage.startsWith("Missing required option(s)"))
-    assertTrue(e.getMessage.contains(" election-type"))
-  }
-
-  @Test
-  def testMissingTopicPartitionSelection(): Unit = {
-    val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
-      Array(
-        "--bootstrap-server", "nohost:9092",
-        "--election-type", "preferred"
-      )
-    ))
-    assertTrue(e.getMessage.startsWith("One and only one of the following 
options is required: "))
-    assertTrue(e.getMessage.contains(" all-topic-partitions"))
-    assertTrue(e.getMessage.contains(" topic"))
-    assertTrue(e.getMessage.contains(" path-to-json-file"))
-  }
-
-  @Test
-  def testInvalidBroker(): Unit = {
-    val e = assertThrows(classOf[AdminCommandFailedException], () => 
LeaderElectionCommand.run(
-      Array(
-        "--bootstrap-server", "example.com:1234",
-        "--election-type", "unclean",
-        "--all-topic-partitions"
-      ),
-      1.seconds
-    ))
-    assertTrue(e.getCause.isInstanceOf[TimeoutException])
-  }
-}
diff --git 
a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
deleted file mode 100644
index ff6cd2cad60..00000000000
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.admin
-
-import java.nio.charset.StandardCharsets
-import java.nio.file.{Files, Path}
-import kafka.server.IntegrationTestUtils.createTopic
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
-import kafka.test.junit.ClusterTestExtensions
-import kafka.test.{ClusterConfig, ClusterInstance}
-import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
-import org.apache.kafka.server.common.AdminCommandFailedException
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.extension.ExtendWith
-import org.junit.jupiter.api.{BeforeEach, Tag}
-
-@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
-@Tag("integration")
-final class LeaderElectionCommandTest(cluster: ClusterInstance) {
-  import LeaderElectionCommandTest._
-
-  val broker1 = 0
-  val broker2 = 1
-  val broker3 = 2
-
-  @BeforeEach
-  def setup(clusterConfig: ClusterConfig): Unit = {
-    TestUtils.verifyNoUnexpectedThreads("@BeforeEach")
-    
clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp, 
"false")
-    
clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp, 
"true")
-    
clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp,
 "1")
-    
clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp,
 "1000")
-    
clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp,
 "2")
-  }
-
-  @ClusterTest
-  def testAllTopicPartition(): Unit = {
-    val topic = "unclean-topic"
-    val partition = 0
-    val assignment = Seq(broker2, broker3)
-
-    cluster.waitForReadyBrokers()
-    val client = cluster.createAdminClient()
-    createTopic(client, topic, Map(partition -> assignment))
-
-    val topicPartition = new TopicPartition(topic, partition)
-
-    TestUtils.assertLeader(client, topicPartition, broker2)
-    cluster.shutdownBroker(broker3)
-    TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
-    cluster.shutdownBroker(broker2)
-    TestUtils.assertNoLeader(client, topicPartition)
-    cluster.startBroker(broker3)
-    TestUtils.waitForOnlineBroker(client, broker3)
-
-    LeaderElectionCommand.main(
-      Array(
-        "--bootstrap-server", cluster.bootstrapServers(),
-        "--election-type", "unclean",
-        "--all-topic-partitions"
-      )
-    )
-
-    TestUtils.assertLeader(client, topicPartition, broker3)
-  }
-
-  @ClusterTest
-  def testTopicPartition(): Unit = {
-    val topic = "unclean-topic"
-    val partition = 0
-    val assignment = Seq(broker2, broker3)
-
-    cluster.waitForReadyBrokers()
-    val client = cluster.createAdminClient()
-    createTopic(client, topic, Map(partition -> assignment))
-
-    val topicPartition = new TopicPartition(topic, partition)
-
-    TestUtils.assertLeader(client, topicPartition, broker2)
-
-    cluster.shutdownBroker(broker3)
-    TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
-    cluster.shutdownBroker(broker2)
-    TestUtils.assertNoLeader(client, topicPartition)
-    cluster.startBroker(broker3)
-    TestUtils.waitForOnlineBroker(client, broker3)
-
-    LeaderElectionCommand.main(
-      Array(
-        "--bootstrap-server", cluster.bootstrapServers(),
-        "--election-type", "unclean",
-        "--topic", topic,
-        "--partition", partition.toString
-      )
-    )
-
-    TestUtils.assertLeader(client, topicPartition, broker3)
-  }
-
-  @ClusterTest
-  def testPathToJsonFile(): Unit = {
-    val topic = "unclean-topic"
-    val partition = 0
-    val assignment = Seq(broker2, broker3)
-
-    cluster.waitForReadyBrokers()
-    val client = cluster.createAdminClient()
-    createTopic(client, topic, Map(partition -> assignment))
-
-    val topicPartition = new TopicPartition(topic, partition)
-
-    TestUtils.assertLeader(client, topicPartition, broker2)
-
-    cluster.shutdownBroker(broker3)
-    TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
-    cluster.shutdownBroker(broker2)
-    TestUtils.assertNoLeader(client, topicPartition)
-    cluster.startBroker(broker3)
-    TestUtils.waitForOnlineBroker(client, broker3)
-
-    val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition))
-
-    LeaderElectionCommand.main(
-      Array(
-        "--bootstrap-server", cluster.bootstrapServers(),
-        "--election-type", "unclean",
-        "--path-to-json-file", topicPartitionPath.toString
-      )
-    )
-
-    TestUtils.assertLeader(client, topicPartition, broker3)
-  }
-
-  @ClusterTest
-  def testPreferredReplicaElection(): Unit = {
-    val topic = "preferred-topic"
-    val partition = 0
-    val assignment = Seq(broker2, broker3)
-
-    cluster.waitForReadyBrokers()
-    val client = cluster.createAdminClient()
-    createTopic(client, topic, Map(partition -> assignment))
-
-    val topicPartition = new TopicPartition(topic, partition)
-
-    TestUtils.assertLeader(client, topicPartition, broker2)
-
-    cluster.shutdownBroker(broker2)
-    TestUtils.assertLeader(client, topicPartition, broker3)
-    cluster.startBroker(broker2)
-    TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2))
-
-    LeaderElectionCommand.main(
-      Array(
-        "--bootstrap-server", cluster.bootstrapServers(),
-        "--election-type", "preferred",
-        "--all-topic-partitions"
-      )
-    )
-
-    TestUtils.assertLeader(client, topicPartition, broker2)
-  }
-
-  @ClusterTest
-  def testTopicDoesNotExist(): Unit = {
-    val e = assertThrows(classOf[AdminCommandFailedException], () => 
LeaderElectionCommand.main(
-      Array(
-        "--bootstrap-server", cluster.bootstrapServers(),
-        "--election-type", "preferred",
-        "--topic", "unknown-topic-name",
-        "--partition", "0"
-      )
-    ))
-    
assertTrue(e.getSuppressed()(0).isInstanceOf[UnknownTopicOrPartitionException])
-  }
-
-  @ClusterTest
-  def testElectionResultOutput(): Unit = {
-    val topic = "non-preferred-topic"
-    val partition0 = 0
-    val partition1 = 1
-    val assignment0 = Seq(broker2, broker3)
-    val assignment1 = Seq(broker3, broker2)
-
-    cluster.waitForReadyBrokers()
-    val client = cluster.createAdminClient()
-    createTopic(client, topic, Map(
-      partition0 -> assignment0,
-      partition1 -> assignment1
-    ))
-
-    val topicPartition0 = new TopicPartition(topic, partition0)
-    val topicPartition1 = new TopicPartition(topic, partition1)
-
-    TestUtils.assertLeader(client, topicPartition0, broker2)
-    TestUtils.assertLeader(client, topicPartition1, broker3)
-
-    cluster.shutdownBroker(broker2)
-    TestUtils.assertLeader(client, topicPartition0, broker3)
-    cluster.startBroker(broker2)
-    TestUtils.waitForBrokersInIsr(client, topicPartition0, Set(broker2))
-    TestUtils.waitForBrokersInIsr(client, topicPartition1, Set(broker2))
-
-    val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition0, 
topicPartition1))
-    val output = TestUtils.grabConsoleOutput(
-      LeaderElectionCommand.main(
-        Array(
-          "--bootstrap-server", cluster.bootstrapServers(),
-          "--election-type", "preferred",
-          "--path-to-json-file", topicPartitionPath.toString
-        )
-      )
-    )
-
-    val electionResultOutputIter = output.split("\n").iterator
-
-    assertTrue(electionResultOutputIter.hasNext)
-    val firstLine = electionResultOutputIter.next()
-    assertTrue(firstLine.contains(s"Successfully completed leader election 
(PREFERRED) for partitions $topicPartition0"),
-    s"Unexpected output: $firstLine")
-
-    assertTrue(electionResultOutputIter.hasNext)
-    val secondLine = electionResultOutputIter.next()
-    assertTrue(secondLine.contains(s"Valid replica already elected for 
partitions $topicPartition1"),
-    s"Unexpected output: $secondLine")
-  }
-}
-
-object LeaderElectionCommandTest {
-  def createConfig(servers: Seq[KafkaServer]): Map[String, Object] = {
-    Map(
-      AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers(servers),
-      AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG -> "20000",
-      AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG -> "10000"
-    )
-  }
-
-  def bootstrapServers(servers: Seq[KafkaServer]): String = {
-    TestUtils.plaintextBootstrapServers(servers)
-  }
-
-  def tempTopicPartitionFile(partitions: Set[TopicPartition]): Path = {
-    val file = TestUtils.tempFile("leader-election-command", ".json")
-
-    val jsonString = TestUtils.stringifyTopicPartitions(partitions)
-
-    Files.write(file.toPath, jsonString.getBytes(StandardCharsets.UTF_8))
-
-    file.toPath
-  }
-}
diff --git a/settings.gradle b/settings.gradle
index 79af7b84c30..cee7ff237a7 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -96,4 +96,5 @@ include 'clients',
     'tools:tools-api',
     'trogdor'
 
+project(":storage:api").name = "storage-api"
 rootProject.name = 'kafka'
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
new file mode 100644
index 00000000000..8d34937c900
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeoutMs, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new 
LeaderElectionCommandOptions(args);
+
+        commandOptions.maybePrintHelpOrVersion();
+
+        commandOptions.validate();
+        ElectionType electionType = commandOptions.getElectionType();
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.getPathToJsonFile())
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = 
Optional.ofNullable(commandOptions.getTopic());
+        Optional<Integer> partitionOption = 
Optional.ofNullable(commandOptions.getPartition());
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new 
TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to 
be null if it is use.
+         * The validate function should be checking that this option is 
required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = 
jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.hasAdminClientConfig()) {
+            
props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
commandOptions.getBootstrapServer());
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
Long.toString(timeoutMs.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
Long.toString(timeoutMs.toMillis() / 2));
+
+        try (Admin adminClient = Admin.create(props)) {
+            electLeaders(adminClient, electionType, topicPartitions);
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, 
Optional<Set<TopicPartition>> partitions) {
+        LOG.debug("Calling AdminClient.electLeaders({}, {})", electionType, 
partitions.orElse(null));
+        Map<TopicPartition, Optional<Throwable>> electionResults;
+        try {
+            electionResults = client.electLeaders(electionType, 
partitions.orElse(null)).partitions().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof TimeoutException) {
+                String message = "Timeout waiting for election results";
+                System.out.println(message);
+                throw new AdminCommandFailedException(message, e.getCause());
+            } else if (e.getCause() instanceof ClusterAuthorizationException) {
+                String message = "Not authorized to perform leader election";
+                System.out.println(message);
+                throw new AdminCommandFailedException(message, 
e.getCause().getCause());
+            } else {
+                throw new RuntimeException(e);
+            }
+        } catch (InterruptedException e) {
+            System.out.println("Error while making request");
+            throw new RuntimeException(e);
+        }
+
+        Set<TopicPartition> succeeded = new HashSet<>();
+        Set<TopicPartition> noop = new HashSet<>();
+        Map<TopicPartition, Throwable> failed = new HashMap<>();
+
+        electionResults.entrySet().stream().forEach(entry -> {
+            Optional<Throwable> error = entry.getValue();
+            if (error.isPresent()) {
+                if (error.get() instanceof ElectionNotNeededException) {
+                    noop.add(entry.getKey());
+                } else {
+                    failed.put(entry.getKey(), error.get());
+                }
+            } else {
+                succeeded.add(entry.getKey());
+            }
+        });
+
+        if (!succeeded.isEmpty()) {
+            String partitionsAsString = succeeded.stream()
+                .map(TopicPartition::toString)
+                .collect(Collectors.joining(", "));
+            System.out.println(String.format("Successfully completed leader 
election (%s) for partitions %s",
+                electionType, partitionsAsString));
+        }
+
+        if (!noop.isEmpty()) {
+            String partitionsAsString = noop.stream()
+                .map(TopicPartition::toString)
+                .collect(Collectors.joining(", "));
+            System.out.println(String.format("Valid replica already elected 
for partitions %s", partitionsAsString));
+        }
+
+        if (!failed.isEmpty()) {
+            AdminCommandFailedException rootException =
+                new AdminCommandFailedException(String.format("%s replica(s) 
could not be elected", failed.size()));
+            failed.entrySet().forEach(entry -> {
+                System.out.println(String.format("Error completing leader 
election (%s) for partition: %s: %s",
+                    electionType, entry.getKey(), entry.getValue()));
+                rootException.addSuppressed(entry.getValue());
+            });
+            throw rootException;
+        }
+    }
+
+    private static Set<TopicPartition> parseReplicaElectionData(String path) {
+        Optional<JsonValue> jsonFile;
+        try {
+            jsonFile = Json.parseFull(Utils.readFileAsString(path));
+            return jsonFile.map(js -> {
+                try {
+                    return topicPartitions(js);
+                } catch (JsonMappingException e) {
+                    throw new RuntimeException(e);
+                }
+            }).orElseThrow(() -> new AdminOperationException("Replica election 
data is empty"));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Set<TopicPartition> topicPartitions(JsonValue js) throws 
JsonMappingException {
+        return js.asJsonObject().get("partitions")
+            .map(partitionsList -> {
+                try {
+                    return toTopicPartition(partitionsList);
+                } catch (JsonMappingException e) {
+                    throw new RuntimeException(e);
+                }
+            })
+            .orElseThrow(() -> new AdminOperationException("Replica election 
data is missing \"partitions\" field"));
+    }
+
+    private static Set<TopicPartition> toTopicPartition(JsonValue 
partitionsList) throws JsonMappingException {
+        List<TopicPartition> partitions = new ArrayList<>();
+        Iterator<JsonValue> iterator = partitionsList.asJsonArray().iterator();
+
+        while (iterator.hasNext()) {
+            JsonObject partitionJs = iterator.next().asJsonObject();
+            String topic = partitionJs.apply("topic").to(STRING);
+            int partition = partitionJs.apply("partition").to(INT);
+            partitions.add(new TopicPartition(topic, partition));
+        }
+
+        Set<TopicPartition> duplicatePartitions  = partitions.stream()
+            .filter(i -> Collections.frequency(partitions, i) > 1)
+            .collect(Collectors.toSet());
+
+        if (duplicatePartitions.size() > 0) {
+            throw new AdminOperationException(String.format(
+                "Replica election data contains duplicate partitions: %s", 
String.join(",", duplicatePartitions.toString()))
+            );
+        }
+        return new HashSet<>(partitions);
+    }
+
+    static class LeaderElectionCommandOptions extends CommandDefaultOptions {
+        private final ArgumentAcceptingOptionSpec<String> bootstrapServer;
+        private final ArgumentAcceptingOptionSpec<String> adminClientConfig;
+        private final ArgumentAcceptingOptionSpec<String> pathToJsonFile;
+        private final ArgumentAcceptingOptionSpec<String> topic;
+        private final ArgumentAcceptingOptionSpec<Integer> partition;
+        private final OptionSpecBuilder allTopicPartitions;
+        private final ArgumentAcceptingOptionSpec<ElectionType> electionType;
+        public LeaderElectionCommandOptions(String[] args) {
+            super(args);
+            bootstrapServer = parser
+                .accepts(
+                    "bootstrap-server",
+                    "A hostname and port for the broker to connect to, in the 
form host:port. Multiple comma separated URLs can be given. REQUIRED.")
+                .withRequiredArg()
+                .describedAs("host:port")
+                .ofType(String.class);
+            adminClientConfig = parser
+                .accepts(
+                    "admin.config",
+                    "Configuration properties files to pass to the admin 
client")
+                .withRequiredArg()
+                .describedAs("config file")
+                .ofType(String.class);
+            pathToJsonFile = parser
+                .accepts(
+                    "path-to-json-file",
+                    "The JSON file with the list  of partition for which 
leader elections should be performed. This is an example format. 
\n{\"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": 
\"foobar\", \"partition\": 2}]\n}\nNot allowed if --all-topic-partitions or 
--topic flags are specified.")
+                .withRequiredArg()
+                .describedAs("Path to JSON file")
+                .ofType(String.class);
+            topic = parser
+                .accepts(
+                    "topic",
+                    "Name of topic for which to perform an election. Not 
allowed if --path-to-json-file or --all-topic-partitions is specified.")
+                .withRequiredArg()
+                .describedAs("topic name")
+                .ofType(String.class);
+
+            partition = parser
+                .accepts(
+                    "partition",
+                    "Partition id for which to perform an election. REQUIRED 
if --topic is specified.")
+                .withRequiredArg()
+                .describedAs("partition id")
+                .ofType(Integer.class);
+
+            allTopicPartitions = parser
+                .accepts(
+                    "all-topic-partitions",
+                    "Perform election on all of the eligible topic partitions 
based on the type of election (see the --election-type flag). Not allowed if 
--topic or --path-to-json-file is specified.");
+            electionType = parser
+                .accepts(
+                    "election-type",
+                    "Type of election to attempt. Possible values are 
\"preferred\" for preferred leader election or \"unclean\" for unclean leader 
election. If preferred election is selection, the election is only performed if 
the current leader is not the preferred leader for the topic partition. If 
unclean election is selected, the election is only performed if there are no 
leader for the topic partition. REQUIRED.")
+                .withRequiredArg()
+                .describedAs("election type")
+                .withValuesConvertedBy(new ElectionTypeConverter());
+
+            options = parser.parse(args);
+        }
+
+        public boolean hasAdminClientConfig() {
+            return options.has(adminClientConfig);
+        }
+
+        public ElectionType getElectionType() {
+            return options.valueOf(electionType);
+        }
+
+        public String getPathToJsonFile() {
+            return options.valueOf(pathToJsonFile);
+        }
+
+        public String getBootstrapServer() {
+            return options.valueOf(bootstrapServer);
+        }
+
+        public String getAdminClientConfig() {
+            return options.valueOf(adminClientConfig);
+        }
+
+        public String getTopic() {
+            return options.valueOf(topic);
+        }
+
+        public Integer getPartition() {
+            return options.valueOf(partition);
+        }
+
+        public void validate() {
+            // required options: --bootstrap-server and --election-type
+            List<String> missingOptions = new ArrayList<>();
+
+            if (!options.has(bootstrapServer)) {
+                missingOptions.add(bootstrapServer.options().get(0));
+            }
+            if (!options.has(electionType)) {
+                missingOptions.add(electionType.options().get(0));
+            }
+            if (!missingOptions.isEmpty()) {
+                throw new AdminCommandFailedException("Missing required 
option(s): " + String.join(", ", missingOptions));
+            }
+
+            // One and only one is required: --topic, --all-topic-partitions 
or --path-to-json-file
+            List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = 
Arrays.asList(
+                topic,
+                allTopicPartitions,
+                pathToJsonFile
+            );
+
+            long mutuallyExclusiveOptionsCount = 
mutuallyExclusiveOptions.stream()
+                .filter(abstractOptionSpec -> options.has(abstractOptionSpec))
+                .count();
+            // 1 is the only correct configuration, don't throw an exception
+            if (mutuallyExclusiveOptionsCount != 1) {
+                throw new AdminCommandFailedException(
+                    "One and only one of the following options is required: " +
+                        mutuallyExclusiveOptions.stream().map(opt -> 
opt.options().get(0)).collect(Collectors.joining(", "))
+                );
+            }
+            // --partition if and only if --topic is used
+            if (options.has(topic) && !options.has(partition)) {
+                throw new AdminCommandFailedException(String.format("Missing 
required option(s): %s",
+                    partition.options().get(0)));
+            }
+
+            if (!options.has(topic) && options.has(partition)) {
+                throw new AdminCommandFailedException(String.format("Option %s 
is only allowed if %s is used",
+                    partition.options().get(0),
+                    topic.options().get(0)
+                ));
+            }
+        }
+
+        public void maybePrintHelpOrVersion() {
+            CommandLineUtils.maybePrintHelpOrVersion(
+                this,
+                "This tool attempts to elect a new leader for a set of topic 
partitions. The type of elections supported are preferred replicas and unclean 
replicas."
+            );
+        }
+
+    }
+
+    static class ElectionTypeConverter extends EnumConverter<ElectionType> {
+        public ElectionTypeConverter() {
+            super(ElectionType.class);
+        }
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandErrorTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandErrorTest.java
new file mode 100644
index 00000000000..fef75bfd104
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandErrorTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * For some error cases, we can save a little build time by avoiding the 
overhead for
+ * cluster creation and cleanup because the command is expected to fail 
immediately.
+ */
+public class LeaderElectionCommandErrorTest {
+    @Test
+    public void testTopicWithoutPartition() {
+        String out = ToolsTestUtils.captureStandardErr(() -> 
LeaderElectionCommand.main(
+                "--bootstrap-server", "nohost:9092",
+                "--election-type", "unclean",
+                "--topic", "some-topic"
+            ));
+        assertTrue(out.startsWith("Missing required option(s)"));
+        assertTrue(out.contains(" partition"));
+    }
+
+    @Test
+    public void testPartitionWithoutTopic() {
+        String out = ToolsTestUtils.captureStandardErr(() -> 
LeaderElectionCommand.main(
+            "--bootstrap-server", "nohost:9092",
+            "--election-type", "unclean",
+            "--all-topic-partitions",
+            "--partition", "0"
+        ));
+        String[] rows = out.split("\n");
+        assertTrue(out.startsWith("Option partition is only allowed if topic 
is used"));
+    }
+
+    @Test
+    public void testMissingElectionType() {
+        String out = ToolsTestUtils.captureStandardErr(() -> 
LeaderElectionCommand.main(
+            "--bootstrap-server", "nohost:9092",
+            "--topic", "some-topic",
+            "--partition", "0"
+        ));
+        assertTrue(out.startsWith("Missing required option(s)"));
+        assertTrue(out.contains(" election-type"));
+    }
+
+    @Test
+    public void testMissingTopicPartitionSelection() {
+        String out = ToolsTestUtils.captureStandardErr(() -> 
LeaderElectionCommand.main(
+            "--bootstrap-server", "nohost:9092",
+            "--election-type", "preferred"
+        ));
+        assertTrue(out.startsWith("One and only one of the following options 
is required: "));
+        assertTrue(out.contains(" all-topic-partitions"));
+        assertTrue(out.contains(" topic"));
+        assertTrue(out.contains(" path-to-json-file"));
+    }
+
+    @Test
+    public void testInvalidBroker() {
+        Throwable e = assertThrows(AdminCommandFailedException.class, () -> 
LeaderElectionCommand.run(
+            Duration.ofSeconds(1),
+            "--bootstrap-server", "example.com:1234",
+            "--election-type", "unclean",
+            "--all-topic-partitions"
+        ));
+        assertTrue(e.getCause() instanceof TimeoutException);
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
new file mode 100644
index 00000000000..0c9fa753f75
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@SuppressWarnings("deprecation")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
+@Tag("integration")
+public class LeaderElectionCommandTest {
+    private final ClusterInstance cluster;
+    int broker2 = 1;
+    int broker3 = 2;
+
+    public LeaderElectionCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @BeforeEach
+    void setup(ClusterConfig clusterConfig) {
+        TestUtils.verifyNoUnexpectedThreads("@BeforeEach");
+        clusterConfig.serverProperties().put("auto.leader.rebalance.enable", 
"false");
+        clusterConfig.serverProperties().put("controlled.shutdown.enable", 
"true");
+        
clusterConfig.serverProperties().put("controlled.shutdown.max.retries", "1");
+        
clusterConfig.serverProperties().put("controlled.shutdown.retry.backoff.ms", 
"1000");
+        
clusterConfig.serverProperties().put("offsets.topic.replication.factor", "2");
+    }
+
+    @ClusterTest
+    public void testAllTopicPartition() throws InterruptedException, 
ExecutionException {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+
+        createTopic(client, topic, Collections.singletonMap(partition, 
assignment));
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+                
JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+                
JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        LeaderElectionCommand.main(
+            "--bootstrap-server", cluster.bootstrapServers(),
+            "--election-type", "unclean",
+            "--all-topic-partitions"
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker3);
+    }
+
+    @ClusterTest
+    public void testTopicPartition() throws InterruptedException, 
ExecutionException {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        createTopic(client, topic, Collections.singletonMap(partition, 
assignment));
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            
JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            
JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        LeaderElectionCommand.main(
+            "--bootstrap-server", cluster.bootstrapServers(),
+            "--election-type", "unclean",
+            "--topic", topic,
+            "--partition", Integer.toString(partition)
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker3);
+    }
+
+    @ClusterTest
+    public void testPathToJsonFile() throws Exception {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        Admin client = cluster.createAdminClient();
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            
JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            
JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        Path topicPartitionPath = 
tempTopicPartitionFile(Collections.singletonList(topicPartition));
+
+        LeaderElectionCommand.main(
+            "--bootstrap-server", cluster.bootstrapServers(),
+            "--election-type", "unclean",
+            "--path-to-json-file", topicPartitionPath.toString()
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker3);
+    }
+
+    @ClusterTest
+    public void testPreferredReplicaElection() throws InterruptedException, 
ExecutionException {
+        String topic = "preferred-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertLeader(client, topicPartition, broker3);
+        cluster.startBroker(broker2);
+        TestUtils.waitForBrokersInIsr(client, topicPartition,
+            
JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet()
+        );
+
+        LeaderElectionCommand.main(
+            "--bootstrap-server", cluster.bootstrapServers(),
+            "--election-type", "preferred",
+            "--all-topic-partitions"
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+    }
+
+    @ClusterTest
+    public void testTopicDoesNotExist() {
+        Throwable e =  assertThrows(AdminCommandFailedException.class, () -> 
LeaderElectionCommand.run(
+            Duration.ofSeconds(30),
+            "--bootstrap-server", cluster.bootstrapServers(),
+            "--election-type", "preferred",
+            "--topic", "unknown-topic-name",
+            "--partition", "0"
+        ));
+        assertTrue(e.getSuppressed()[0] instanceof 
UnknownTopicOrPartitionException);
+    }
+
+    @ClusterTest
+    public void testElectionResultOutput() throws Exception {
+        String topic = "non-preferred-topic";
+        int partition0 = 0;
+        int partition1 = 1;
+        List<Integer> assignment0 = Arrays.asList(broker2, broker3);
+        List<Integer> assignment1 = Arrays.asList(broker3, broker2);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition0, assignment0);
+        partitionAssignment.put(partition1, assignment1);
+
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition0 = new TopicPartition(topic, partition0);
+        TopicPartition topicPartition1 = new TopicPartition(topic, partition1);
+
+        TestUtils.assertLeader(client, topicPartition0, broker2);
+        TestUtils.assertLeader(client, topicPartition1, broker3);
+
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertLeader(client, topicPartition0, broker3);
+        cluster.startBroker(broker2);
+        TestUtils.waitForBrokersInIsr(client, topicPartition0,
+            
JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet()
+        );
+        TestUtils.waitForBrokersInIsr(client, topicPartition1,
+            
JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet()
+        );
+
+        Path topicPartitionPath = 
tempTopicPartitionFile(Arrays.asList(topicPartition0, topicPartition1));
+        String output = ToolsTestUtils.captureStandardOut(() ->
+            LeaderElectionCommand.main(
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "preferred",
+                "--path-to-json-file", topicPartitionPath.toString()
+            ));
+
+        Iterator<String> electionResultOutputIter = 
Arrays.stream(output.split("\n")).iterator();
+
+        assertTrue(electionResultOutputIter.hasNext());
+        String firstLine = electionResultOutputIter.next();
+        assertTrue(firstLine.contains(String.format(
+            "Successfully completed leader election (PREFERRED) for partitions 
%s", topicPartition0)),
+            String.format("Unexpected output: %s", firstLine));
+
+        assertTrue(electionResultOutputIter.hasNext());
+        String secondLine = electionResultOutputIter.next();
+        assertTrue(secondLine.contains(String.format("Valid replica already 
elected for partitions %s", topicPartition1)),
+            String.format("Unexpected output: %s", secondLine));
+    }
+
+    private static void createTopic(Admin admin, String topic, Map<Integer, 
List<Integer>> replicaAssignment) throws ExecutionException, 
InterruptedException {
+        NewTopic newTopic = new NewTopic(topic, replicaAssignment);
+        List<NewTopic> newTopics = Collections.singletonList(newTopic);
+        CreateTopicsResult createTopicResult = admin.createTopics(newTopics);
+        createTopicResult.all().get();
+    }
+
+    private static Path tempTopicPartitionFile(List<TopicPartition> 
partitions) throws Exception {
+        java.io.File file = TestUtils.tempFile("leader-election-command", 
".json");
+
+        scala.collection.immutable.Set<TopicPartition> topicPartitionSet =
+            JavaConverters.asScalaBuffer(partitions).toSet();
+        String jsonString = 
TestUtils.stringifyTopicPartitions(topicPartitionSet);
+
+        Files.write(file.toPath(), 
jsonString.getBytes(StandardCharsets.UTF_8));
+
+        return file.toPath();
+    }
+}

Reply via email to