This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 269b652  KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to 
use Admin… (#3848)
269b652 is described below

commit 269b65279c746bc54c611141a5a6509f9b310f11
Author: Tom Bentley <tombent...@users.noreply.github.com>
AuthorDate: Fri Jan 25 22:06:18 2019 +0000

    KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use Admin… 
(#3848)
    
    See also KIP-183.
    
    This implements the following algorithm:
    
    AdminClient sends ElectPreferredLeadersRequest.
    KafakApis receives ElectPreferredLeadersRequest and delegates to
    ReplicaManager.electPreferredLeaders()
    ReplicaManager delegates to KafkaController.electPreferredLeaders()
    KafkaController adds a PreferredReplicaLeaderElection to the EventManager,
    ReplicaManager.electPreferredLeaders()'s callback uses the
    delayedElectPreferredReplicasPurgatory to wait for the results of the
    election to appear in the metadata cache. If there are no results
    because of errors, or because the preferred leaders are already leading
    the partitions then a response is returned immediately.
    In the EventManager work thread the preferred leader is elected as follows:
    
    The EventManager runs PreferredReplicaLeaderElection.process()
    process() calls KafkaController.onPreferredReplicaElectionWithResults()
    KafkaController.onPreferredReplicaElectionWithResults()
    calls the PartitionStateMachine.handleStateChangesWithResults() to
    perform the election (asynchronously the PSM will send LeaderAndIsrRequest
    to the new and old leaders and UpdateMetadataRequest to all brokers)
    then invokes the callback.
    
    Reviewers: Colin P. McCabe <cmcc...@apache.org>, Jun Rao <jun...@gmail.com>
---
 checkstyle/import-control.xml                      |   2 +
 .../org/apache/kafka/clients/NetworkClient.java    |   6 +-
 .../apache/kafka/clients/admin/AdminClient.java    |  52 ++++
 .../admin/ElectPreferredLeadersOptions.java        |  31 ++
 .../clients/admin/ElectPreferredLeadersResult.java | 136 +++++++++
 .../kafka/clients/admin/KafkaAdminClient.java      |  33 ++
 .../PreferredLeaderNotAvailableException.java      |  28 ++
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  14 +-
 .../org/apache/kafka/common/protocol/Errors.java   |   5 +-
 .../kafka/common/requests/AbstractRequest.java     |   2 +
 .../kafka/common/requests/AbstractResponse.java    |   4 +-
 .../requests/ElectPreferredLeadersRequest.java     | 129 ++++++++
 .../requests/ElectPreferredLeadersResponse.java    |  78 +++++
 .../message/ElectPreferredLeadersRequest.json      |  33 ++
 .../message/ElectPreferredLeadersResponse.json     |  39 +++
 .../src/main/resources/common/message/README.md    |   2 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  54 ++++
 .../kafka/clients/admin/MockAdminClient.java       |   4 +
 .../kafka/common/requests/RequestContextTest.java  |   3 +-
 .../kafka/common/requests/RequestResponseTest.java |  40 ++-
 .../PreferredReplicaLeaderElectionCommand.scala    | 217 +++++++++++--
 .../kafka/controller/ControllerEventManager.scala  |   5 +
 .../scala/kafka/controller/KafkaController.scala   | 159 ++++++++--
 .../kafka/controller/PartitionStateMachine.scala   |  40 ++-
 .../kafka/server/DelayedElectPreferredLeader.scala |  89 ++++++
 core/src/main/scala/kafka/server/KafkaApis.scala   |  49 +++
 .../main/scala/kafka/server/MetadataCache.scala    |   6 +
 .../main/scala/kafka/server/ReplicaManager.scala   |  37 +++
 .../kafka/api/AdminClientIntegrationTest.scala     | 203 ++++++++++++-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  20 +-
 ...PreferredReplicaLeaderElectionCommandTest.scala | 337 +++++++++++++++++++++
 .../AbstractCoordinatorConcurrencyTest.scala       |   2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |   8 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  11 +
 35 files changed, 1798 insertions(+), 82 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index a0bf740..0d316c5 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -116,6 +116,7 @@
 
     <subpackage name="protocol">
       <allow pkg="org.apache.kafka.common.errors" />
+      <allow pkg="org.apache.kafka.common.message" />
       <allow pkg="org.apache.kafka.common.protocol.types" />
       <allow pkg="org.apache.kafka.common.record" />
       <allow pkg="org.apache.kafka.common.requests" />
@@ -140,6 +141,7 @@
     <subpackage name="requests">
       <allow pkg="org.apache.kafka.common.acl" />
       <allow pkg="org.apache.kafka.common.protocol" />
+      <allow pkg="org.apache.kafka.common.message" />
       <allow pkg="org.apache.kafka.common.network" />
       <allow pkg="org.apache.kafka.common.requests" />
       <allow pkg="org.apache.kafka.common.resource" />
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 144987e..3973701 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -676,7 +676,8 @@ public class NetworkClient implements KafkaClient {
 
     public static AbstractResponse parseResponse(ByteBuffer responseBuffer, 
RequestHeader requestHeader) {
         Struct responseStruct = 
parseStructMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, null, 
0);
-        return AbstractResponse.parseResponse(requestHeader.apiKey(), 
responseStruct);
+        return AbstractResponse.parseResponse(requestHeader.apiKey(), 
responseStruct,
+                requestHeader.apiVersion());
     }
 
     private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer 
responseBuffer, RequestHeader requestHeader,
@@ -811,7 +812,8 @@ public class NetworkClient implements KafkaClient {
                     req.header.apiKey(), req.header.correlationId(), 
responseStruct);
             }
             // If the received response includes a throttle delay, throttle 
the connection.
-            AbstractResponse body = 
AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
+            AbstractResponse body = AbstractResponse.
+                    parseResponse(req.header.apiKey(), responseStruct, 
req.header.apiVersion());
             maybeThrottle(body, req.header.apiVersion(), req.destination, now);
             if (req.isInternalRequest && body instanceof MetadataResponse)
                 metadataUpdater.handleCompletedMetadataResponse(req.header, 
now, (MetadataResponse) body);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index bdd7cc3..b823cdc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -793,6 +793,58 @@ public abstract class AdminClient implements AutoCloseable 
{
     }
 
     /**
+     * Elect the preferred broker of the given {@code partitions} as leader, or
+     * elect the preferred broker for all partitions as leader if the argument 
to {@code partitions} is null.
+     *
+     * This is a convenience method for {@link 
#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}
+     * with default options.
+     * See the overload for more details.
+     *
+     * @param partitions      The partitions for which the preferred leader 
should be elected.
+     * @return                The ElectPreferredLeadersResult.
+     */
+    public ElectPreferredLeadersResult 
electPreferredLeaders(Collection<TopicPartition> partitions) {
+        return electPreferredLeaders(partitions, new 
ElectPreferredLeadersOptions());
+    }
+
+    /**
+     * Elect the preferred broker of the given {@code partitions} as leader, or
+     * elect the preferred broker for all partitions as leader if the argument 
to {@code partitions} is null.
+     *
+     * This operation is not transactional so it may succeed for some 
partitions while fail for others.
+     *
+     * It may take several seconds after this method returns
+     * success for all the brokers in the cluster to become aware that the 
partitions have new leaders.
+     * During this time, {@link AdminClient#describeTopics(Collection)}
+     * may not return information about the partitions' new leaders.
+     *
+     * This operation is supported by brokers with version 2.2.0 or higher.
+     *
+     * <p>The following exceptions can be anticipated when calling {@code 
get()} on the futures obtained from
+     * the returned {@code ElectPreferredLeadersResult}:</p>
+     * <ul>
+     *   <li>{@link 
org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   if the authenticated user didn't have alter access to the 
cluster.</li>
+     *   <li>{@link 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+     *   if the topic or partition did not exist within the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidTopicException}
+     *   if the topic was already queued for deletion.</li>
+     *   <li>{@link org.apache.kafka.common.errors.NotControllerException}
+     *   if the request was sent to a broker that was not the controller for 
the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   if the request timed out before the election was complete.</li>
+     *   <li>{@link org.apache.kafka.common.errors.LeaderNotAvailableException}
+     *   if the preferred leader was not alive or not in the ISR.</li>
+     * </ul>
+     *
+     * @param partitions      The partitions for which the preferred leader 
should be elected.
+     * @param options         The options to use when electing the preferred 
leaders.
+     * @return                The ElectPreferredLeadersResult.
+     */
+    public abstract ElectPreferredLeadersResult 
electPreferredLeaders(Collection<TopicPartition> partitions,
+                                                                      
ElectPreferredLeadersOptions options);
+
+    /**
      * Get the metrics kept by the adminClient
      */
     public abstract Map<MetricName, ? extends Metric> metrics();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
new file mode 100644
index 0000000..80b0097
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * Options for {@link AdminClient#electPreferredLeaders(Collection, 
ElectPreferredLeadersOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ElectPreferredLeadersOptions extends 
AbstractOptions<ElectPreferredLeadersOptions> {
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java
new file mode 100644
index 0000000..c76336a
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The result of {@link AdminClient#electPreferredLeaders(Collection, 
ElectPreferredLeadersOptions)}
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ElectPreferredLeadersResult {
+
+    private final KafkaFutureImpl<Map<TopicPartition, ApiError>> 
electionFuture;
+    private final Set<TopicPartition> partitions;
+
+    ElectPreferredLeadersResult(KafkaFutureImpl<Map<TopicPartition, ApiError>> 
electionFuture, Set<TopicPartition> partitions) {
+        this.electionFuture = electionFuture;
+        this.partitions = partitions;
+    }
+
+    /**
+     * Get the result of the election for the given {@code partition}.
+     * If there was not an election triggered for the given {@code partition}, 
the
+     * returned future will complete with an error.
+     */
+    public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
+        final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+        electionFuture.whenComplete(new 
KafkaFuture.BiConsumer<Map<TopicPartition, ApiError>, Throwable>() {
+            @Override
+            public void accept(Map<TopicPartition, ApiError> topicPartitions, 
Throwable throwable) {
+                if (throwable != null) {
+                    result.completeExceptionally(throwable);
+                } else if (!topicPartitions.containsKey(partition)) {
+                    result.completeExceptionally(new 
UnknownTopicOrPartitionException(
+                            "Preferred leader election for partition \"" + 
partition +
+                                    "\" was not attempted"));
+                } else {
+                    if (partitions == null && topicPartitions.isEmpty()) {
+                        
result.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
+                    }
+                    ApiException exception = 
topicPartitions.get(partition).exception();
+                    if (exception == null) {
+                        result.complete(null);
+                    } else {
+                        result.completeExceptionally(exception);
+                    }
+                }
+            }
+        });
+        return result;
+    }
+
+    /**
+     * <p>Get a future for the topic partitions for which a leader election
+     * was attempted. A partition will be present in this result if
+     * an election was attempted even if the election was not successful.</p>
+     *
+     * <p>This method is provided to discover the partitions attempted when
+     * {@link AdminClient#electPreferredLeaders(Collection)} is called
+     * with a null {@code partitions} argument.</p>
+     */
+    public KafkaFuture<Set<TopicPartition>> partitions() {
+        if (partitions != null) {
+            return KafkaFutureImpl.completedFuture(this.partitions);
+        } else {
+            final KafkaFutureImpl<Set<TopicPartition>> result = new 
KafkaFutureImpl<>();
+            electionFuture.whenComplete(new 
KafkaFuture.BiConsumer<Map<TopicPartition, ApiError>, Throwable>() {
+                @Override
+                public void accept(Map<TopicPartition, ApiError> 
topicPartitions, Throwable throwable) {
+                    if (throwable != null) {
+                        result.completeExceptionally(throwable);
+                    } else if (topicPartitions.isEmpty()) {
+                        
result.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
+                    } else {
+                        for (ApiError apiError : topicPartitions.values()) {
+                            if (apiError.isFailure()) {
+                                
result.completeExceptionally(apiError.exception());
+                            }
+                        }
+                        result.complete(topicPartitions.keySet());
+                    }
+                }
+            });
+            return result;
+        }
+    }
+
+    /**
+     * Return a future which succeeds if all the topic elections succeed.
+     */
+    public KafkaFuture<Void> all() {
+        final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+        electionFuture.thenApply(new KafkaFuture.Function<Map<TopicPartition, 
ApiError>, Void>() {
+            @Override
+            public Void apply(Map<TopicPartition, ApiError> topicPartitions) {
+                for (ApiError apiError : topicPartitions.values()) {
+                    if (apiError.isFailure()) {
+                        result.completeExceptionally(apiError.exception());
+                        return null;
+                    }
+                }
+                result.complete(null);
+                return null;
+            }
+        });
+        return result;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 2ba3cf2..58baab7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -105,6 +105,8 @@ import 
org.apache.kafka.common.requests.DescribeGroupsRequest;
 import org.apache.kafka.common.requests.DescribeGroupsResponse;
 import org.apache.kafka.common.requests.DescribeLogDirsRequest;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import org.apache.kafka.common.requests.ElectPreferredLeadersRequest;
+import org.apache.kafka.common.requests.ElectPreferredLeadersResponse;
 import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
 import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
@@ -2777,4 +2779,35 @@ public class KafkaAdminClient extends AdminClient {
     public Map<MetricName, ? extends Metric> metrics() {
         return Collections.unmodifiableMap(this.metrics.metrics());
     }
+
+    @Override
+    public ElectPreferredLeadersResult electPreferredLeaders(final 
Collection<TopicPartition> partitions,
+                                                             
ElectPreferredLeadersOptions options) {
+        final Set<TopicPartition> partitionSet = partitions != null ? new 
HashSet<>(partitions) : null;
+        final KafkaFutureImpl<Map<TopicPartition, ApiError>> electionFuture = 
new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("electPreferredLeaders", calcDeadlineMs(now, 
options.timeoutMs()),
+                new ControllerNodeProvider()) {
+
+            @Override
+            public AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new ElectPreferredLeadersRequest.Builder(
+                        ElectPreferredLeadersRequest.toRequestData(partitions, 
timeoutMs));
+            }
+
+            @Override
+            public void handleResponse(AbstractResponse abstractResponse) {
+                ElectPreferredLeadersResponse response = 
(ElectPreferredLeadersResponse) abstractResponse;
+                electionFuture.complete(
+                        
ElectPreferredLeadersRequest.fromResponseData(response.data()));
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                electionFuture.completeExceptionally(throwable);
+            }
+        }, now);
+        return new ElectPreferredLeadersResult(electionFuture, partitionSet);
+    }
+
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/PreferredLeaderNotAvailableException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/PreferredLeaderNotAvailableException.java
new file mode 100644
index 0000000..73dfd64
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/PreferredLeaderNotAvailableException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class PreferredLeaderNotAvailableException extends 
InvalidMetadataException {
+
+    public PreferredLeaderNotAvailableException(String message) {
+        super(message);
+    }
+
+    public PreferredLeaderNotAvailableException(String message, Throwable 
cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 3d77100..80b118b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.protocol;
 
+import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
+import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -35,10 +37,10 @@ import 
org.apache.kafka.common.requests.ControlledShutdownRequest;
 import org.apache.kafka.common.requests.ControlledShutdownResponse;
 import org.apache.kafka.common.requests.CreateAclsRequest;
 import org.apache.kafka.common.requests.CreateAclsResponse;
-import org.apache.kafka.common.requests.CreatePartitionsRequest;
-import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
 import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
+import org.apache.kafka.common.requests.CreatePartitionsRequest;
+import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsRequest;
@@ -53,12 +55,12 @@ import org.apache.kafka.common.requests.DescribeAclsRequest;
 import org.apache.kafka.common.requests.DescribeAclsResponse;
 import org.apache.kafka.common.requests.DescribeConfigsRequest;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
+import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
 import org.apache.kafka.common.requests.DescribeGroupsRequest;
 import org.apache.kafka.common.requests.DescribeGroupsResponse;
 import org.apache.kafka.common.requests.DescribeLogDirsRequest;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
-import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
-import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
 import org.apache.kafka.common.requests.EndTxnRequest;
 import org.apache.kafka.common.requests.EndTxnResponse;
 import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
@@ -186,7 +188,9 @@ public enum ApiKeys {
     RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", 
RenewDelegationTokenRequest.schemaVersions(), 
RenewDelegationTokenResponse.schemaVersions()),
     EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", 
ExpireDelegationTokenRequest.schemaVersions(), 
ExpireDelegationTokenResponse.schemaVersions()),
     DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", 
DescribeDelegationTokenRequest.schemaVersions(), 
DescribeDelegationTokenResponse.schemaVersions()),
-    DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), 
DeleteGroupsResponse.schemaVersions());
+    DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), 
DeleteGroupsResponse.schemaVersions()),
+    ELECT_PREFERRED_LEADERS(43, "ElectPreferredLeaders", 
ElectPreferredLeadersRequestData.SCHEMAS,
+            ElectPreferredLeadersResponseData.SCHEMAS);
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 14ca06d..5bcff43 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -72,6 +72,7 @@ import 
org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.OperationNotAttemptedException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.PreferredLeaderNotAvailableException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.ReassignmentInProgressException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
@@ -297,7 +298,9 @@ public enum Errors {
             "election so the offsets cannot be guaranteed to be monotonically 
increasing",
             OffsetNotAvailableException::new),
     MEMBER_ID_REQUIRED(79, "The group member needs to have a valid member id 
before actually entering a consumer group",
-            MemberIdRequiredException::new);
+            MemberIdRequiredException::new),
+    PREFERRED_LEADER_NOT_AVAILABLE(80, "The preferred leader was not 
available",
+            PreferredLeaderNotAvailableException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index d16e60f..239024f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -227,6 +227,8 @@ public abstract class AbstractRequest extends 
AbstractRequestResponse {
                 return new DescribeDelegationTokenRequest(struct, apiVersion);
             case DELETE_GROUPS:
                 return new DeleteGroupsRequest(struct, apiVersion);
+            case ELECT_PREFERRED_LEADERS:
+                return new ElectPreferredLeadersRequest(struct, apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index c0ebef1..036814c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -68,7 +68,7 @@ public abstract class AbstractResponse extends 
AbstractRequestResponse {
 
     protected abstract Struct toStruct(short version);
 
-    public static AbstractResponse parseResponse(ApiKeys apiKey, Struct 
struct) {
+    public static AbstractResponse parseResponse(ApiKeys apiKey, Struct 
struct, short version) {
         switch (apiKey) {
             case PRODUCE:
                 return new ProduceResponse(struct);
@@ -156,6 +156,8 @@ public abstract class AbstractResponse extends 
AbstractRequestResponse {
                 return new DescribeDelegationTokenResponse(struct);
             case DELETE_GROUPS:
                 return new DeleteGroupsResponse(struct);
+            case ELECT_PREFERRED_LEADERS:
+                return new ElectPreferredLeadersResponse(struct, version);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersRequest.java
new file mode 100644
index 0000000..ab96e3b
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersRequest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
+import 
org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ElectPreferredLeadersRequest extends AbstractRequest {
+    public static class Builder extends 
AbstractRequest.Builder<ElectPreferredLeadersRequest> {
+        private final ElectPreferredLeadersRequestData data;
+
+        public Builder(ElectPreferredLeadersRequestData data) {
+            super(ApiKeys.ELECT_PREFERRED_LEADERS);
+            this.data = data;
+        }
+
+        @Override
+        public ElectPreferredLeadersRequest build(short version) {
+            return new ElectPreferredLeadersRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    public static ElectPreferredLeadersRequestData 
toRequestData(Collection<TopicPartition> partitions, int timeoutMs) {
+        ElectPreferredLeadersRequestData d = new 
ElectPreferredLeadersRequestData()
+                .setTimeoutMs(timeoutMs);
+        if (partitions != null) {
+            for (Map.Entry<String, List<Integer>> tp : 
CollectionUtils.groupPartitionsByTopic(partitions).entrySet()) {
+                d.topicPartitions().add(new 
ElectPreferredLeadersRequestData.TopicPartitions().setTopic(tp.getKey()).setPartitionId(tp.getValue()));
+            }
+        } else {
+            d.setTopicPartitions(null);
+        }
+        return d;
+    }
+
+    public static Map<TopicPartition, ApiError> 
fromResponseData(ElectPreferredLeadersResponseData data) {
+        Map<TopicPartition, ApiError> map = new HashMap<>();
+        for (ElectPreferredLeadersResponseData.ReplicaElectionResult 
topicResults : data.replicaElectionResults()) {
+            for (ElectPreferredLeadersResponseData.PartitionResult 
partitionResult : topicResults.partitionResult()) {
+                map.put(new TopicPartition(topicResults.topic(), 
partitionResult.partitionId()),
+                        new 
ApiError(Errors.forCode(partitionResult.errorCode()),
+                                partitionResult.errorMessage()));
+            }
+        }
+        return map;
+    }
+
+    private final ElectPreferredLeadersRequestData data;
+    private final short version;
+
+    private ElectPreferredLeadersRequest(ElectPreferredLeadersRequestData 
data, short version) {
+        super(ApiKeys.ELECT_PREFERRED_LEADERS, version);
+        this.data = data;
+        this.version = version;
+    }
+
+    public ElectPreferredLeadersRequest(Struct struct, short version) {
+        super(ApiKeys.ELECT_PREFERRED_LEADERS, version);
+        this.data = new ElectPreferredLeadersRequestData(struct, version);
+        this.version = version;
+    }
+
+    public ElectPreferredLeadersRequestData data() {
+        return data;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        ElectPreferredLeadersResponseData response = new 
ElectPreferredLeadersResponseData();
+        response.setThrottleTimeMs(throttleTimeMs);
+        ApiError apiError = ApiError.fromThrowable(e);
+        for (TopicPartitions topic : data.topicPartitions()) {
+            ReplicaElectionResult electionResult = new 
ReplicaElectionResult().setTopic(topic.topic());
+            for (Integer partitionId : topic.partitionId()) {
+                electionResult.partitionResult().add(new 
ElectPreferredLeadersResponseData.PartitionResult()
+                        .setPartitionId(partitionId)
+                        .setErrorCode(apiError.error().code())
+                        .setErrorMessage(apiError.message()));
+            }
+            response.replicaElectionResults().add(electionResult);
+        }
+        return new ElectPreferredLeadersResponse(response);
+    }
+
+    public static ElectPreferredLeadersRequest parse(ByteBuffer buffer, short 
version) {
+        return new 
ElectPreferredLeadersRequest(ApiKeys.ELECT_PREFERRED_LEADERS.parseRequest(version,
 buffer), version);
+    }
+
+    /**
+     * Visible for testing.
+     */
+    @Override
+    public Struct toStruct() {
+        return data.toStruct(version);
+    }
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersResponse.java
new file mode 100644
index 0000000..d19a51d
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersResponse.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
+import 
org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ElectPreferredLeadersResponse extends AbstractResponse {
+
+    private final ElectPreferredLeadersResponseData data;
+
+    public ElectPreferredLeadersResponse(ElectPreferredLeadersResponseData 
data) {
+        this.data = data;
+    }
+
+    public ElectPreferredLeadersResponse(Struct struct, short version) {
+        this.data = new ElectPreferredLeadersResponseData(struct, version);
+    }
+
+    public ElectPreferredLeadersResponseData data() {
+        return data;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        return data.toStruct(version);
+    }
+
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        HashMap<Errors, Integer> counts = new HashMap<>();
+        for (ReplicaElectionResult result : data.replicaElectionResults()) {
+            for (PartitionResult partitionResult : result.partitionResult()) {
+                Errors error = Errors.forCode(partitionResult.errorCode());
+                counts.put(error, counts.getOrDefault(error, 0) + 1);
+            }
+        }
+        return counts;
+    }
+
+    public static ElectPreferredLeadersResponse parse(ByteBuffer buffer, short 
version) {
+        return new ElectPreferredLeadersResponse(
+                
ApiKeys.ELECT_PREFERRED_LEADERS.responseSchema(version).read(buffer), version);
+    }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 3;
+    }
+}
\ No newline at end of file
diff --git 
a/clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json 
b/clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json
new file mode 100644
index 0000000..f566cdf
--- /dev/null
+++ 
b/clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 43,
+  "type": "request",
+  "name": "ElectPreferredLeadersRequest",
+  "validVersions": "0",
+  "fields": [
+    { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": 
"0+", "nullableVersions": "0+",
+      "about": "The topic partitions to elect the preferred leader of.",
+      "fields": [
+        { "name": "Topic", "type": "string", "versions": "0+",
+          "about": "The name of a topic." },
+        { "name": "PartitionId", "type": "[]int32", "versions": "0+",
+          "about": "The partitions of this topic whose preferred leader should 
be elected" }
+      ]},
+    { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": 
"60000",
+      "about": "The time in ms to wait for the election to complete." }
+  ]
+}
\ No newline at end of file
diff --git 
a/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json 
b/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json
new file mode 100644
index 0000000..f34599c
--- /dev/null
+++ 
b/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 43,
+  "type": "response",
+  "name": "ElectPreferredLeadersResponse",
+  "validVersions": "0",
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+    { "name": "ReplicaElectionResults", "type": "[]ReplicaElectionResult", 
"versions": "0+",
+      "about": "The error code, or 0 if there was no error.", "fields": [
+      { "name": "Topic", "type": "string", "versions": "0+",
+        "about": "The topic name" },
+      { "name": "PartitionResult", "type": "[]PartitionResult", "versions": 
"0+",
+        "about": "The results for each partition", "fields": [
+        { "name": "PartitionId", "type": "int32", "versions": "0+",
+          "about": "The partition id" },
+        { "name": "ErrorCode", "type": "int16", "versions": "0+",
+          "about": "The result error, or zero if there was no error."},
+        { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+",
+          "about": "The result message, or null if there was no error."}
+      ]}
+    ]}
+  ]
+}
\ No newline at end of file
diff --git a/clients/src/main/resources/common/message/README.md 
b/clients/src/main/resources/common/message/README.md
index 5648f37..482b1dd 100644
--- a/clients/src/main/resources/common/message/README.md
+++ b/clients/src/main/resources/common/message/README.md
@@ -187,7 +187,7 @@ One very common pattern in Kafka is to load array elements 
from a message into
 a Map or Set for easier access.  The message protocol makes this easier with
 the "mapKey" concept.  
 
-If some of the elemements of an array are annotated with "mapKey": true, the
+If some of the elements of an array are annotated with "mapKey": true, the
 entire array will be treated as a linked hash set rather than a list.  Elements
 in this set will be accessible in O(1) time with an automatically generated
 "find" function.  The order of elements in the set will still be preserved,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 82c5b1d..12b076d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidTopicException;
@@ -50,6 +51,9 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicDeletionDisabledException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
+import 
org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsResponse;
@@ -67,6 +71,7 @@ import org.apache.kafka.common.requests.DeleteTopicsResponse;
 import org.apache.kafka.common.requests.DescribeAclsResponse;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
 import org.apache.kafka.common.requests.DescribeGroupsResponse;
+import org.apache.kafka.common.requests.ElectPreferredLeadersResponse;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.ListGroupsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
@@ -634,6 +639,55 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testElectPreferredLeaders()  throws Exception {
+        TopicPartition topic1 = new TopicPartition("topic", 0);
+        TopicPartition topic2 = new TopicPartition("topic", 2);
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Test a call where one partition has an error.
+            ApiError value = ApiError.fromThrowable(new 
ClusterAuthorizationException(null));
+            ElectPreferredLeadersResponseData responseData = new 
ElectPreferredLeadersResponseData();
+            ReplicaElectionResult r = new 
ReplicaElectionResult().setTopic(topic1.topic());
+            r.partitionResult().add(new PartitionResult()
+                    .setPartitionId(topic1.partition())
+                    .setErrorCode(ApiError.NONE.error().code())
+                    .setErrorMessage(ApiError.NONE.message()));
+            r.partitionResult().add(new PartitionResult()
+                    .setPartitionId(topic2.partition())
+                    .setErrorCode(value.error().code())
+                    .setErrorMessage(value.message()));
+            responseData.replicaElectionResults().add(r);
+            env.kafkaClient().prepareResponse(new 
ElectPreferredLeadersResponse(responseData));
+            ElectPreferredLeadersResult results = 
env.adminClient().electPreferredLeaders(asList(topic1, topic2));
+            results.partitionResult(topic1).get();
+            TestUtils.assertFutureError(results.partitionResult(topic2), 
ClusterAuthorizationException.class);
+            TestUtils.assertFutureError(results.all(), 
ClusterAuthorizationException.class);
+
+            // Test a call where there are no errors.
+            r.partitionResult().clear();
+            r.partitionResult().add(new PartitionResult()
+                    .setPartitionId(topic1.partition())
+                    .setErrorCode(ApiError.NONE.error().code())
+                    .setErrorMessage(ApiError.NONE.message()));
+            r.partitionResult().add(new PartitionResult()
+                    .setPartitionId(topic2.partition())
+                    .setErrorCode(ApiError.NONE.error().code())
+                    .setErrorMessage(ApiError.NONE.message()));
+            env.kafkaClient().prepareResponse(new 
ElectPreferredLeadersResponse(responseData));
+
+            results = env.adminClient().electPreferredLeaders(asList(topic1, 
topic2));
+            results.partitionResult(topic1).get();
+            results.partitionResult(topic2).get();
+
+            // Now try a timeout
+            results = env.adminClient().electPreferredLeaders(asList(topic1, 
topic2), new ElectPreferredLeadersOptions().timeoutMs(100));
+            TestUtils.assertFutureError(results.partitionResult(topic1), 
TimeoutException.class);
+            TestUtils.assertFutureError(results.partitionResult(topic2), 
TimeoutException.class);
+        }
+    }
+
     /**
      * Test handling timeouts.
      */
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index aa2e683..d721245 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -327,6 +327,10 @@ public class MockAdminClient extends AdminClient {
         throw new UnsupportedOperationException("Not implemented yet");
     }
 
+    public ElectPreferredLeadersResult 
electPreferredLeaders(Collection<TopicPartition> partitions, 
ElectPreferredLeadersOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
     @Override
     public CreateAclsResult createAcls(Collection<AclBinding> acls, 
CreateAclsOptions options) {
         throw new UnsupportedOperationException("Not implemented yet");
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
index ed50b93..857869f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
@@ -68,7 +68,8 @@ public class RequestContextTest {
         assertEquals(correlationId, responseHeader.correlationId());
 
         Struct struct = ApiKeys.API_VERSIONS.parseResponse((short) 0, 
responseBuffer);
-        ApiVersionsResponse response = (ApiVersionsResponse) 
AbstractResponse.parseResponse(ApiKeys.API_VERSIONS, struct);
+        ApiVersionsResponse response = (ApiVersionsResponse)
+                AbstractResponse.parseResponse(ApiKeys.API_VERSIONS, struct, 
(short) 0);
         assertEquals(Errors.UNSUPPORTED_VERSION, response.error());
         assertTrue(response.apiVersions().isEmpty());
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index aa98031..2892bb6 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -32,6 +32,11 @@ import 
org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
+import 
org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -304,6 +309,10 @@ public class RequestResponseTest {
         checkRequest(createRenewTokenRequest());
         checkErrorResponse(createRenewTokenRequest(), new 
UnknownServerException());
         checkResponse(createRenewTokenResponse(), 0);
+        checkRequest(createElectPreferredLeadersRequest());
+        checkRequest(createElectPreferredLeadersRequestNullPartitions());
+        checkErrorResponse(createElectPreferredLeadersRequest(), new 
UnknownServerException());
+        checkResponse(createElectPreferredLeadersResponse(), 0);
     }
 
     @Test
@@ -460,7 +469,7 @@ public class RequestResponseTest {
         Struct deserializedStruct = ApiKeys.PRODUCE.parseResponse(version, 
buffer);
 
         ProduceResponse v5FromBytes = (ProduceResponse) 
AbstractResponse.parseResponse(ApiKeys.PRODUCE,
-                deserializedStruct);
+                deserializedStruct, version);
 
         assertEquals(1, v5FromBytes.responses().size());
         assertTrue(v5FromBytes.responses().containsKey(tp0));
@@ -1322,4 +1331,33 @@ public class RequestResponseTest {
 
         return new DescribeDelegationTokenResponse(20, Errors.NONE, tokenList);
     }
+
+    private ElectPreferredLeadersRequest 
createElectPreferredLeadersRequestNullPartitions() {
+        return new ElectPreferredLeadersRequest.Builder(
+                new ElectPreferredLeadersRequestData()
+                        .setTimeoutMs(100)
+                        .setTopicPartitions(null))
+                .build((short) 0);
+    }
+
+    private ElectPreferredLeadersRequest createElectPreferredLeadersRequest() {
+        ElectPreferredLeadersRequestData data = new 
ElectPreferredLeadersRequestData()
+                .setTimeoutMs(100);
+        data.topicPartitions().add(new 
TopicPartitions().setTopic("data").setPartitionId(asList(1, 2)));
+        return new ElectPreferredLeadersRequest.Builder(data).build((short) 0);
+    }
+
+    private ElectPreferredLeadersResponse 
createElectPreferredLeadersResponse() {
+        ElectPreferredLeadersResponseData data = new 
ElectPreferredLeadersResponseData().setThrottleTimeMs(200);
+        ReplicaElectionResult resultsByTopic = new 
ReplicaElectionResult().setTopic("myTopic");
+        resultsByTopic.partitionResult().add(new 
PartitionResult().setPartitionId(0)
+                .setErrorCode(Errors.NONE.code())
+                .setErrorMessage(Errors.NONE.message()));
+        resultsByTopic.partitionResult().add(new 
PartitionResult().setPartitionId(1)
+                .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()));
+        data.replicaElectionResults().add(resultsByTopic);
+        return new ElectPreferredLeadersResponse(data);
+    }
+
 }
diff --git 
a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala 
b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 7bfecde..8740ed4 100755
--- 
a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ 
b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -16,12 +16,20 @@
  */
 package kafka.admin
 
+import java.util.Properties
+import java.util.concurrent.ExecutionException
+
+import joptsimple.OptionSpecBuilder
 import kafka.common.AdminCommandFailedException
 import kafka.utils._
 import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.common.errors.TimeoutException
+
+import collection.JavaConverters._
 import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.{KafkaFuture, TopicPartition}
 import org.apache.zookeeper.KeeperException.NodeExistsException
 
 import collection._
@@ -29,33 +37,47 @@ import collection._
 object PreferredReplicaLeaderElectionCommand extends Logging {
 
   def main(args: Array[String]): Unit = {
+
+    val timeout = 30000
+    run(args, timeout)
+  }
+  def run(args: Array[String], timeout: Int = 30000): Unit = {
     val commandOpts = new PreferredReplicaLeaderElectionCommandOptions(args)
     CommandLineUtils.printHelpAndExitIfNeeded(commandOpts, "This tool helps to 
causes leadership for each partition to be transferred back to the 'preferred 
replica'," +
       " it can be used to balance leadership among the servers.")
 
-    CommandLineUtils.checkRequiredArgs(commandOpts.parser, 
commandOpts.options, commandOpts.zkConnectOpt)
+    CommandLineUtils.checkRequiredArgs(commandOpts.parser, commandOpts.options)
 
-    val zkConnect = commandOpts.options.valueOf(commandOpts.zkConnectOpt)
-    var zkClient: KafkaZkClient = null
-    try {
-      val time = Time.SYSTEM
-      zkClient = KafkaZkClient(zkConnect, JaasUtils.isZkSecurityEnabled, 
30000, 30000, Int.MaxValue, time)
+    if (commandOpts.options.has(commandOpts.bootstrapServerOpt) == 
commandOpts.options.has(commandOpts.zkConnectOpt)) {
+      CommandLineUtils.printUsageAndDie(commandOpts.parser, s"Exactly one of 
'${commandOpts.bootstrapServerOpt}' or '${commandOpts.zkConnectOpt}' must be 
provided")
+    }
+
+    val partitionsForPreferredReplicaElection =
+    if (commandOpts.options.has(commandOpts.jsonFileOpt))
+      
Some(parsePreferredReplicaElectionData(Utils.readFileAsString(commandOpts.options.valueOf(commandOpts.jsonFileOpt))))
+    else
+      None
 
-      val partitionsForPreferredReplicaElection =
-        if (!commandOpts.options.has(commandOpts.jsonFileOpt))
-          zkClient.getAllPartitions()
+    val preferredReplicaElectionCommand = if 
(commandOpts.options.has(commandOpts.zkConnectOpt)) {
+      println(s"Warning: --zookeeper is deprecated and will be removed in a 
future version of Kafka.")
+      println(s"Use --bootstrap-server instead to specify a broker to connect 
to.")
+      new ZkCommand(commandOpts.options.valueOf(commandOpts.zkConnectOpt),
+              JaasUtils.isZkSecurityEnabled,
+              timeout)
+    } else {
+        val adminProps = if 
(commandOpts.options.has(commandOpts.adminClientConfigOpt))
+          
Utils.loadProps(commandOpts.options.valueOf(commandOpts.adminClientConfigOpt))
         else
-          
parsePreferredReplicaElectionData(Utils.readFileAsString(commandOpts.options.valueOf(commandOpts.jsonFileOpt)))
-      val preferredReplicaElectionCommand = new 
PreferredReplicaLeaderElectionCommand(zkClient, 
partitionsForPreferredReplicaElection)
+          new Properties()
+        adminProps.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
commandOpts.options.valueOf(commandOpts.bootstrapServerOpt))
+        adminProps.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
timeout.toString)
+        new AdminClientCommand(adminProps)
+    }
 
-      preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
-    } catch {
-      case e: Throwable =>
-        println("Failed to start preferred replica election")
-        println(Utils.stackTrace(e))
+    try {
+      
preferredReplicaElectionCommand.electPreferredLeaders(partitionsForPreferredReplicaElection)
     } finally {
-      if (zkClient != null)
-        zkClient.close()
+      preferredReplicaElectionCommand.close()
     }
   }
 
@@ -101,14 +123,165 @@ object PreferredReplicaLeaderElectionCommand extends 
Logging {
       .withRequiredArg
       .describedAs("list of partitions for which preferred replica leader 
election needs to be triggered")
       .ofType(classOf[String])
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection 
string for the zookeeper connection in the " +
-      "form host:port. Multiple URLS can be given to allow fail-over.")
+
+    private val zookeeperOptBuilder: OptionSpecBuilder = 
parser.accepts("zookeeper",
+      "DEPRECATED. The connection string for the zookeeper connection in the " 
+
+      "form host:port. Multiple URLS can be given to allow fail-over. " +
+      "Replaced by --bootstrap-server, REQUIRED unless --bootstrap-server is 
given.")
+    private val bootstrapOptBuilder: OptionSpecBuilder = 
parser.accepts("bootstrap-server",
+      "A hostname and port for the broker to connect to, " +
+      "in the form host:port. Multiple comma-separated URLs can be given. 
REQUIRED unless --zookeeper is given.")
+    parser.mutuallyExclusive(zookeeperOptBuilder, bootstrapOptBuilder)
+    val bootstrapServerOpt = bootstrapOptBuilder
+      .withRequiredArg
+      .describedAs("host:port")
+      .ofType(classOf[String])
+    val zkConnectOpt = zookeeperOptBuilder
       .withRequiredArg
       .describedAs("urls")
       .ofType(classOf[String])
 
+    val adminClientConfigOpt = parser.accepts("admin.config",
+      "Admin client config properties file to pass to the admin client when 
--bootstrap-server is given.")
+      .availableIf(bootstrapServerOpt)
+      .withRequiredArg
+      .describedAs("config file")
+      .ofType(classOf[String])
+
+    parser.accepts("")
     options = parser.parse(args: _*)
   }
+
+  /** Abstraction over different ways to perform a leader election */
+  trait Command {
+    /** Elect the preferred leader for the given {@code partitionsForElection}.
+      * If the given {@code partitionsForElection} are None then elect the 
preferred leader for all partitions.
+      */
+    def electPreferredLeaders(partitionsForElection: 
Option[Set[TopicPartition]]) : Unit
+    def close() : Unit
+  }
+
+  class ZkCommand(zkConnect: String, isSecure: Boolean, timeout: Int)
+    extends Command {
+    var zkClient: KafkaZkClient = null
+
+    val time = Time.SYSTEM
+    zkClient = KafkaZkClient(zkConnect, isSecure, timeout, timeout, 
Int.MaxValue, time)
+
+    override def electPreferredLeaders(partitionsFromUser: 
Option[Set[TopicPartition]]) {
+      try {
+        val topics =
+          partitionsFromUser match {
+            case Some(partitions) =>
+              partitions.map(_.topic).toSet
+            case None =>
+              zkClient.getAllPartitions().map(_.topic)
+          }
+
+        val partitionsFromZk = 
zkClient.getPartitionsForTopics(topics).flatMap{ case (topic, partitions) =>
+          partitions.map(new TopicPartition(topic, _))
+        }.toSet
+
+        val (validPartitions, invalidPartitions) =
+          partitionsFromUser match {
+            case Some(partitions) =>
+              partitions.partition(partitionsFromZk.contains)
+            case None =>
+              (zkClient.getAllPartitions(), Set.empty)
+          }
+          
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient,
 validPartitions)
+
+        println("Successfully started preferred replica election for 
partitions %s".format(validPartitions))
+        invalidPartitions.foreach(p => println("Skipping preferred replica 
leader election for partition %s since it doesn't exist.".format(p)))
+      } catch {
+        case e: Throwable => throw new AdminCommandFailedException("Admin 
command failed", e)
+      }
+    }
+
+    override def close(): Unit = {
+      if (zkClient != null)
+        zkClient.close()
+    }
+  }
+
+  /** Election via AdminClient.electPreferredLeaders() */
+  class AdminClientCommand(adminClientProps: Properties)
+    extends Command with Logging {
+
+    val adminClient = 
org.apache.kafka.clients.admin.AdminClient.create(adminClientProps)
+
+    /**
+      * Wait until the given future has completed, then return whether it 
completed exceptionally.
+      * Because KafkaFuture.isCompletedExceptionally doesn't wait for a result
+      */
+    private def completedExceptionally[T](future: KafkaFuture[T]): Boolean = {
+      try {
+        future.get()
+        false
+      } catch {
+        case (_: Throwable) =>
+          true
+      }
+    }
+
+    override def electPreferredLeaders(partitionsFromUser: 
Option[Set[TopicPartition]]): Unit = {
+      val partitions = partitionsFromUser match {
+        case Some(partitionsFromUser) => partitionsFromUser.asJava
+        case None => null
+      }
+      debug(s"Calling AdminClient.electPreferredLeaders($partitions)")
+      val result = adminClient.electPreferredLeaders(partitions)
+      // wait for all results
+
+      val attemptedPartitions = partitionsFromUser match {
+        case Some(partitionsFromUser) => partitions.asScala
+        case None => try {
+          result.partitions().get.asScala
+        } catch {
+          case e: ExecutionException =>
+            val cause = e.getCause
+            if (cause.isInstanceOf[TimeoutException]) {
+              // We timed out, or don't even know the attempted partitions
+              println("Timeout waiting for election results")
+            }
+            throw new AdminCommandFailedException(null, cause)
+          case e: Throwable =>
+            // We don't even know the attempted partitions
+            println("Error while making request")
+            e.printStackTrace()
+            return
+        }
+      }
+
+      val (exceptional, ok) = attemptedPartitions.map(tp => tp -> 
result.partitionResult(tp)).
+        partition { case (_, partitionResult) => 
completedExceptionally(partitionResult) }
+
+      if (!ok.isEmpty) {
+        println(s"Successfully completed preferred replica election for 
partitions ${ok.map{ case (tp, future) => tp }.mkString(", ")}")
+      }
+      if (!exceptional.isEmpty) {
+        val adminException = new AdminCommandFailedException(
+          s"${exceptional.size} preferred replica(s) could not be elected")
+        for ((partition, void) <- exceptional) {
+          val exception = try {
+            void.get()
+            new AdminCommandFailedException("Exceptional future with no 
exception")
+          } catch {
+            case e: ExecutionException => e.getCause
+          }
+          println(s"Error completing preferred replica election for partition 
$partition: $exception")
+          adminException.addSuppressed(exception)
+        }
+        throw adminException
+      }
+
+    }
+
+    override def close(): Unit = {
+      debug("Closing AdminClient")
+      adminClient.close()
+    }
+  }
 }
 
 class PreferredReplicaLeaderElectionCommand(zkClient: KafkaZkClient, 
partitionsFromUser: scala.collection.Set[TopicPartition]) {
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala 
b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index c93e9e7..54e3a9e 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.kafka.common.utils.Time
 
 import scala.collection._
+import scala.collection.JavaConverters._
 
 object ControllerEventManager {
   val ControllerEventThreadName = "controller-event-thread"
@@ -69,6 +70,10 @@ class ControllerEventManager(controllerId: Int, 
rateAndTimeMetrics: Map[Controll
   }
 
   def clearAndPut(event: ControllerEvent): Unit = inLock(putLock) {
+    queue.asScala.foreach(evt =>
+      if (evt.isInstanceOf[PreemptableControllerEvent])
+        evt.asInstanceOf[PreemptableControllerEvent].preempt()
+    )
     queue.clear()
     put(event)
   }
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index c8cf446..ea23beb 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -16,6 +16,7 @@
  */
 package kafka.controller
 
+import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import com.yammer.metrics.core.Gauge
@@ -32,13 +33,13 @@ import org.apache.kafka.common.{KafkaException, 
TopicPartition}
 import org.apache.kafka.common.errors.{BrokerNotAvailableException, 
ControllerMovedException, StaleBrokerEpochException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractControlRequest, 
AbstractResponse, LeaderAndIsrResponse, StopReplicaResponse}
+import org.apache.kafka.common.requests.{AbstractControlRequest, 
AbstractResponse, ApiError, LeaderAndIsrResponse, StopReplicaResponse}
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.Code
 
 import scala.collection._
-import scala.util.Try
+import scala.util.{Failure, Try}
 
 object KafkaController extends Logging {
   val InitialControllerEpoch = 0
@@ -268,7 +269,7 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
     
maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet)
     topicDeletionManager.tryTopicDeletion()
     val pendingPreferredReplicaElections = 
fetchPendingPreferredReplicaElections()
-    onPreferredReplicaElection(pendingPreferredReplicaElections)
+    onPreferredReplicaElection(pendingPreferredReplicaElections, ZkTriggered)
     info("Starting the controller scheduler")
     kafkaScheduler.startup()
     if (config.autoLeaderRebalanceEnable) {
@@ -586,7 +587,7 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
     val partitionsToBeRemovedFromReassignment = 
scala.collection.mutable.Set.empty[TopicPartition]
     topicPartitions.foreach { tp =>
       if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {
-        error(s"Skipping reassignment of $tp since the topic is currently 
being deleted")
+        info(s"Skipping reassignment of $tp since the topic is currently being 
deleted")
         partitionsToBeRemovedFromReassignment.add(tp)
       } else {
         val reassignedPartitionContext = 
controllerContext.partitionsBeingReassigned.get(tp).getOrElse {
@@ -628,17 +629,38 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
     
removePartitionsFromReassignedPartitions(partitionsToBeRemovedFromReassignment)
   }
 
-  private def onPreferredReplicaElection(partitions: Set[TopicPartition], 
isTriggeredByAutoRebalance: Boolean = false) {
+  sealed trait ElectionType
+  object AutoTriggered extends ElectionType
+  object ZkTriggered extends ElectionType
+  object AdminClientTriggered extends ElectionType
+
+  /**
+    * Attempt to elect the preferred replica as leader for each of the given 
partitions.
+    * @param partitions The partitions to have their preferred leader elected
+    * @param electionType The election type
+    * @return A map of failed elections where keys are partitions which had an 
error and the corresponding value is
+    *         the exception that was thrown.
+    */
+  private def onPreferredReplicaElection(partitions: Set[TopicPartition],
+                                         electionType: ElectionType): 
Map[TopicPartition, Throwable] = {
     info(s"Starting preferred replica leader election for partitions 
${partitions.mkString(",")}")
     try {
-      partitionStateMachine.handleStateChanges(partitions.toSeq, 
OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
-    } catch {
-      case e: ControllerMovedException =>
-        error(s"Error completing preferred replica leader election for 
partitions ${partitions.mkString(",")} because controller has moved to another 
broker.", e)
-        throw e
-      case e: Throwable => error(s"Error completing preferred replica leader 
election for partitions ${partitions.mkString(",")}", e)
+      val results = partitionStateMachine.handleStateChanges(partitions.toSeq, 
OnlinePartition,
+        Option(PreferredReplicaPartitionLeaderElectionStrategy))
+      if (electionType != AdminClientTriggered) {
+        results.foreach { case (tp, throwable) =>
+          if (throwable.isInstanceOf[ControllerMovedException]) {
+            error(s"Error completing preferred replica leader election for 
partition $tp because controller has moved to another broker.", throwable)
+            throw throwable
+          } else {
+            error(s"Error completing preferred replica leader election for 
partition $tp", throwable)
+          }
+        }
+      }
+      return results;
     } finally {
-      removePartitionsFromPreferredReplicaElection(partitions, 
isTriggeredByAutoRebalance)
+      if (electionType != AdminClientTriggered)
+        removePartitionsFromPreferredReplicaElection(partitions, electionType 
== AutoTriggered)
     }
   }
 
@@ -884,7 +906,7 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
     if (!isTriggeredByAutoRebalance) {
       zkClient.deletePreferredReplicaElection(controllerContext.epochZkVersion)
       // Ensure we detect future preferred replica leader elections
-      eventManager.put(PreferredReplicaLeaderElection)
+      eventManager.put(PreferredReplicaLeaderElection(None))
     }
   }
 
@@ -983,7 +1005,7 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
           controllerContext.partitionsBeingReassigned.isEmpty &&
           !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
           controllerContext.allTopics.contains(tp.topic))
-        onPreferredReplicaElection(candidatePartitions.toSet, 
isTriggeredByAutoRebalance = true)
+        onPreferredReplicaElection(candidatePartitions.toSet, AutoTriggered)
       }
     }
   }
@@ -1022,11 +1044,15 @@ class KafkaController(val config: KafkaConfig, 
zkClient: KafkaZkClient, time: Ti
     }
   }
 
-  case class ControlledShutdown(id: Int, brokerEpoch: Long, 
controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends 
ControllerEvent {
+  case class ControlledShutdown(id: Int, brokerEpoch: Long, 
controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends 
PreemptableControllerEvent {
 
     def state = ControllerState.ControlledShutdown
 
-    override def process(): Unit = {
+    override def handlePreempt(): Unit = {
+      controlledShutdownCallback(Failure(new 
ControllerMovedException("Controller moved to another broker")))
+    }
+
+    override def handleProcess(): Unit = {
       val controlledShutdownResult = Try { doControlledShutdown(id) }
       controlledShutdownCallback(controlledShutdownResult)
     }
@@ -1517,22 +1543,75 @@ class KafkaController(val config: KafkaConfig, 
zkClient: KafkaZkClient, time: Ti
     }
   }
 
-  case object PreferredReplicaLeaderElection extends ControllerEvent {
+  type ElectPreferredLeadersCallback = (Map[TopicPartition, Int], 
Map[TopicPartition, ApiError])=>Unit
+
+  def electPreferredLeaders(partitions: Set[TopicPartition], callback: 
ElectPreferredLeadersCallback = { (_,_) => }): Unit =
+    eventManager.put(PreferredReplicaLeaderElection(Some(partitions), 
AdminClientTriggered, callback))
+
+  case class PreferredReplicaLeaderElection(partitionsFromAdminClientOpt: 
Option[Set[TopicPartition]],
+                                            electionType: ElectionType = 
ZkTriggered,
+                                            callback: 
ElectPreferredLeadersCallback = (_,_) =>{}) extends PreemptableControllerEvent {
     override def state: ControllerState = ControllerState.ManualLeaderBalance
 
-    override def process(): Unit = {
-      if (!isActive) return
+    override def handlePreempt(): Unit = {
+      callback(Map.empty, partitionsFromAdminClientOpt match {
+        case Some(partitions) => partitions.map(partition => partition -> new 
ApiError(Errors.NOT_CONTROLLER, null)).toMap
+        case None => Map.empty
+      })
+    }
+
+    override def handleProcess(): Unit = {
+      if (!isActive) {
+        callback(Map.empty, partitionsFromAdminClientOpt match {
+          case Some(partitions) => partitions.map(partition => partition -> 
new ApiError(Errors.NOT_CONTROLLER, null)).toMap
+          case None => Map.empty
+        })
+      } else {
+        // We need to register the watcher if the path doesn't exist in order 
to detect future preferred replica
+        // leader elections and we get the `path exists` check for free
+        if (electionType == AdminClientTriggered || 
zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler))
 {
+          val partitions = partitionsFromAdminClientOpt match {
+            case Some(partitions) => partitions
+            case None => zkClient.getPreferredReplicaElection
+          }
+
+          val (validPartitions, invalidPartitions) = partitions.partition(tp 
=> controllerContext.allPartitions.contains(tp))
+          invalidPartitions.foreach { p =>
+            info(s"Skipping preferred replica leader election for partition 
${p} since it doesn't exist.")
+          }
 
-      // We need to register the watcher if the path doesn't exist in order to 
detect future preferred replica
-      // leader elections and we get the `path exists` check for free
-      if 
(zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler))
 {
-        val partitions = zkClient.getPreferredReplicaElection
-        val partitionsForTopicsToBeDeleted = partitions.filter(p => 
topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
-        if (partitionsForTopicsToBeDeleted.nonEmpty) {
-          error(s"Skipping preferred replica election for partitions 
$partitionsForTopicsToBeDeleted since the " +
-            "respective topics are being deleted")
+          val (partitionsBeingDeleted, livePartitions) = 
validPartitions.partition(partition =>
+            topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
+          if (partitionsBeingDeleted.nonEmpty) {
+            warn(s"Skipping preferred replica election for partitions 
$partitionsBeingDeleted " +
+              s"since the respective topics are being deleted")
+          }
+          // partition those where preferred is already leader
+          val (electablePartitions, alreadyPreferred) = 
livePartitions.partition { partition =>
+            val assignedReplicas = 
controllerContext.partitionReplicaAssignment(partition)
+            val preferredReplica = assignedReplicas.head
+            val currentLeader = 
controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
+            currentLeader != preferredReplica
+          }
+
+          val electionErrors = onPreferredReplicaElection(electablePartitions, 
electionType)
+          val successfulPartitions = electablePartitions -- 
electionErrors.keySet
+          val results = electionErrors.map { case (partition, ex) =>
+            val apiError = if (ex.isInstanceOf[StateChangeFailedException])
+              new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE, 
ex.getMessage)
+            else
+              ApiError.fromThrowable(ex)
+            partition -> apiError
+          } ++
+            alreadyPreferred.map(_ -> ApiError.NONE) ++
+            partitionsBeingDeleted.map(_ -> new 
ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is being deleted")) ++
+            invalidPartitions.map ( tp => tp -> new 
ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, s"The partition does not exist.")
+            )
+          debug(s"PreferredReplicaLeaderElection waiting: 
$successfulPartitions, results: $results")
+          callback(successfulPartitions.map(
+              tp => 
tp->controllerContext.partitionReplicaAssignment(tp).head).toMap,
+            results)
         }
-        onPreferredReplicaElection(partitions -- 
partitionsForTopicsToBeDeleted)
       }
     }
   }
@@ -1653,7 +1732,7 @@ object IsrChangeNotificationHandler {
 class PreferredReplicaElectionHandler(controller: KafkaController, 
eventManager: ControllerEventManager) extends ZNodeChangeHandler {
   override val path: String = PreferredReplicaElectionZNode.path
 
-  override def handleCreation(): Unit = 
eventManager.put(controller.PreferredReplicaLeaderElection)
+  override def handleCreation(): Unit = 
eventManager.put(controller.PreferredReplicaLeaderElection(None))
 }
 
 class ControllerChangeHandler(controller: KafkaController, eventManager: 
ControllerEventManager) extends ZNodeChangeHandler {
@@ -1712,3 +1791,25 @@ sealed trait ControllerEvent {
   def state: ControllerState
   def process(): Unit
 }
+
+/**
+  * A `ControllerEvent`, such as one with a client callback, which needs 
specific handling in the event of ZK session expiration.
+  */
+sealed trait PreemptableControllerEvent extends ControllerEvent {
+
+  val spent = new AtomicBoolean(false)
+
+  final def preempt(): Unit = {
+    if (!spent.getAndSet(true))
+      handlePreempt()
+  }
+
+  final def process(): Unit = {
+    if (!spent.getAndSet(true))
+      handleProcess()
+  }
+
+  def handlePreempt(): Unit
+
+  def handleProcess(): Unit
+}
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index e4f0532..ad73979 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -125,22 +125,36 @@ class PartitionStateMachine(config: KafkaConfig,
     // It is important to trigger leader election for those partitions.
   }
 
+  /**
+    * Try to change the state of the given partitions to the given 
targetState, using the given
+    * partitionLeaderElectionStrategyOpt if a leader election is required.
+    * @param partitions The partitions
+    * @param targetState The state
+    * @param partitionLeaderElectionStrategyOpt The leader election strategy 
if a leader election is required.
+    * @return partitions and corresponding throwable for those partitions 
which could not transition to the given state
+    */
   def handleStateChanges(partitions: Seq[TopicPartition], targetState: 
PartitionState,
-                         partitionLeaderElectionStrategyOpt: 
Option[PartitionLeaderElectionStrategy] = None): Unit = {
+                         partitionLeaderElectionStrategyOpt: 
Option[PartitionLeaderElectionStrategy] = None): Map[TopicPartition, Throwable] 
= {
     if (partitions.nonEmpty) {
       try {
         controllerBrokerRequestBatch.newBatch()
-        doHandleStateChanges(partitions, targetState, 
partitionLeaderElectionStrategyOpt)
+        val errors = doHandleStateChanges(partitions, targetState, 
partitionLeaderElectionStrategyOpt)
         
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
+        errors
       } catch {
         case e: ControllerMovedException =>
           error(s"Controller moved to another broker when moving some 
partitions to $targetState state", e)
           throw e
-        case e: Throwable => error(s"Error while moving some partitions to 
$targetState state", e)
+        case e: Throwable =>
+          error(s"Error while moving some partitions to $targetState state", e)
+          partitions.map { _ -> e }.toMap
       }
+    } else {
+      Map.empty[TopicPartition, Throwable]
     }
   }
 
+
   def partitionsInState(state: PartitionState): Set[TopicPartition] = {
     partitionState.filter { case (_, s) => s == state }.keySet.toSet
   }
@@ -183,7 +197,7 @@ class PartitionStateMachine(config: KafkaConfig,
    * @param targetState The end state that the partition should be moved to
    */
   private def doHandleStateChanges(partitions: Seq[TopicPartition], 
targetState: PartitionState,
-                           partitionLeaderElectionStrategyOpt: 
Option[PartitionLeaderElectionStrategy]): Unit = {
+                           partitionLeaderElectionStrategyOpt: 
Option[PartitionLeaderElectionStrategy]): Map[TopicPartition, Throwable] = {
     val stateChangeLog = 
stateChangeLogger.withControllerEpoch(controllerContext.epoch)
     partitions.foreach(partition => partitionState.getOrElseUpdate(partition, 
NonExistentPartition))
     val (validPartitions, invalidPartitions) = partitions.partition(partition 
=> isValidTransition(partition, targetState))
@@ -195,6 +209,7 @@ class PartitionStateMachine(config: KafkaConfig,
             s"assigned replicas 
${controllerContext.partitionReplicaAssignment(partition).mkString(",")}")
           changeStateTo(partition, partitionState(partition), NewPartition)
         }
+        Map.empty
       case OnlinePartition =>
         val uninitializedPartitions = validPartitions.filter(partition => 
partitionState(partition) == NewPartition)
         val partitionsToElectLeader = validPartitions.filter(partition => 
partitionState(partition) == OfflinePartition || partitionState(partition) == 
OnlinePartition)
@@ -207,23 +222,28 @@ class PartitionStateMachine(config: KafkaConfig,
           }
         }
         if (partitionsToElectLeader.nonEmpty) {
-          val successfulElections = 
electLeaderForPartitions(partitionsToElectLeader, 
partitionLeaderElectionStrategyOpt.get)
+          val (successfulElections, failedElections) = 
electLeaderForPartitions(partitionsToElectLeader, 
partitionLeaderElectionStrategyOpt.get)
           successfulElections.foreach { partition =>
             stateChangeLog.trace(s"Changed partition $partition from 
${partitionState(partition)} to $targetState with state " +
               
s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
             changeStateTo(partition, partitionState(partition), 
OnlinePartition)
           }
+          failedElections
+        } else {
+          Map.empty
         }
       case OfflinePartition =>
         validPartitions.foreach { partition =>
           stateChangeLog.trace(s"Changed partition $partition state from 
${partitionState(partition)} to $targetState")
           changeStateTo(partition, partitionState(partition), OfflinePartition)
         }
+        Map.empty
       case NonExistentPartition =>
         validPartitions.foreach { partition =>
           stateChangeLog.trace(s"Changed partition $partition state from 
${partitionState(partition)} to $targetState")
           changeStateTo(partition, partitionState(partition), 
NonExistentPartition)
         }
+        Map.empty
     }
   }
 
@@ -283,11 +303,14 @@ class PartitionStateMachine(config: KafkaConfig,
    * Repeatedly attempt to elect leaders for multiple partitions until there 
are no more remaining partitions to retry.
    * @param partitions The partitions that we're trying to elect leaders for.
    * @param partitionLeaderElectionStrategy The election strategy to use.
-   * @return The partitions that successfully had a leader elected.
+   * @return A pair with first element of which is the partitions that 
successfully had a leader elected
+    *        and the second element a map of failed partition to the 
corresponding thrown exception.
    */
-  private def electLeaderForPartitions(partitions: Seq[TopicPartition], 
partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): 
Seq[TopicPartition] = {
+  private def electLeaderForPartitions(partitions: Seq[TopicPartition],
+                                       partitionLeaderElectionStrategy: 
PartitionLeaderElectionStrategy): (Seq[TopicPartition], Map[TopicPartition, 
Throwable]) = {
     val successfulElections = mutable.Buffer.empty[TopicPartition]
     var remaining = partitions
+    var failures = Map.empty[TopicPartition, Throwable]
     while (remaining.nonEmpty) {
       val (success, updatesToRetry, failedElections) = 
doElectLeaderForPartitions(partitions, partitionLeaderElectionStrategy)
       remaining = updatesToRetry
@@ -295,8 +318,9 @@ class PartitionStateMachine(config: KafkaConfig,
       failedElections.foreach { case (partition, e) =>
         logFailedStateChange(partition, partitionState(partition), 
OnlinePartition, e)
       }
+      failures ++= failedElections
     }
-    successfulElections
+    (successfulElections, failures)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala 
b/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala
new file mode 100644
index 0000000..38b07ad
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala
@@ -0,0 +1,89 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.ApiError
+
+import scala.collection.{Map, mutable}
+
+/** A delayed elect preferred leader operation that can be created by the 
replica manager and watched
+  * in the elect preferred leader purgatory
+  */
+class DelayedElectPreferredLeader(delayMs: Long,
+                                  expectedLeaders: Map[TopicPartition, Int],
+                                  results: Map[TopicPartition, ApiError],
+                                  replicaManager: ReplicaManager,
+                                  responseCallback: Map[TopicPartition, 
ApiError] => Unit)
+    extends DelayedOperation(delayMs) {
+
+  var waitingPartitions = expectedLeaders
+  val fullResults = results.to[mutable.Set]
+
+
+  /**
+    * Call-back to execute when a delayed operation gets expired and hence 
forced to complete.
+    */
+  override def onExpiration(): Unit = {}
+
+  /**
+    * Process for completing an operation; This function needs to be defined
+    * in subclasses and will be called exactly once in forceComplete()
+    */
+  override def onComplete(): Unit = {
+    // This could be called to force complete, so I need the full list of 
partitions, so I can time them all out.
+    updateWaiting()
+    val timedout = waitingPartitions.map{
+      case (tp, leader) => tp -> new ApiError(Errors.REQUEST_TIMED_OUT, null)
+    }.toMap
+    responseCallback(timedout ++ fullResults)
+  }
+
+  private def timeoutWaiting = {
+    waitingPartitions.map(partition => partition -> new 
ApiError(Errors.REQUEST_TIMED_OUT, null)).toMap
+  }
+
+  /**
+    * Try to complete the delayed operation by first checking if the operation
+    * can be completed by now. If yes execute the completion logic by calling
+    * forceComplete() and return true iff forceComplete returns true; 
otherwise return false
+    *
+    * This function needs to be defined in subclasses
+    */
+  override def tryComplete(): Boolean = {
+    updateWaiting()
+    debug(s"tryComplete() waitingPartitions: $waitingPartitions")
+    waitingPartitions.isEmpty && forceComplete()
+  }
+
+  private def updateWaiting() = {
+    waitingPartitions.foreach{case (tp, leader) =>
+      val ps = replicaManager.metadataCache.getPartitionInfo(tp.topic, 
tp.partition)
+      ps match {
+        case Some(ps) =>
+          if (leader == ps.basePartitionState.leader) {
+            waitingPartitions -= tp
+            fullResults += tp -> ApiError.NONE
+          }
+        case None =>
+      }
+    }
+  }
+
+}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 67020a8..aab9a38 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -43,6 +43,7 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, 
TRANSACTION_STATE_TOPIC_NAME, isInternal}
+import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -146,6 +147,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.EXPIRE_DELEGATION_TOKEN => 
handleExpireTokenRequest(request)
         case ApiKeys.DESCRIBE_DELEGATION_TOKEN => 
handleDescribeTokensRequest(request)
         case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
+        case ApiKeys.ELECT_PREFERRED_LEADERS => 
handleElectPreferredReplicaLeader(request)
       }
     } catch {
       case e: FatalExitError => throw e
@@ -253,6 +255,11 @@ class KafkaApis(val requestChannel: RequestChannel,
           quotas.request.updateQuotaMetricConfigs()
         }
       }
+      if (replicaManager.hasDelayedElectionOperations) {
+        updateMetadataRequest.partitionStates.asScala.foreach { case (tp, ps) 
=>
+          replicaManager.tryCompleteElection(new 
TopicPartitionOperationKey(tp.topic(), tp.partition()))
+        }
+      }
       sendResponseExemptThrottle(request, new 
UpdateMetadataResponse(Errors.NONE))
     }
   }
@@ -2227,6 +2234,48 @@ class KafkaApis(val requestChannel: RequestChannel,
       true
   }
 
+  def handleElectPreferredReplicaLeader(request: RequestChannel.Request): Unit 
= {
+
+    val electionRequest = request.body[ElectPreferredLeadersRequest]
+    val partitions =
+      if (electionRequest.data().topicPartitions() == null) {
+        metadataCache.getAllPartitions()
+      } else {
+        electionRequest.data().topicPartitions().asScala.flatMap{tp =>
+          tp.partitionId().asScala.map(partitionId => new 
TopicPartition(tp.topic, partitionId))}.toSet
+      }
+    def sendResponseCallback(result: Map[TopicPartition, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs => {
+        val results = result.
+          groupBy{case (tp, error) => tp.topic}.
+          map{case (topic, ps) => new 
ElectPreferredLeadersResponseData.ReplicaElectionResult()
+            .setTopic(topic)
+            .setPartitionResult(ps.map{
+            case (tp, error) =>
+              new ElectPreferredLeadersResponseData.PartitionResult()
+                .setErrorCode(error.error.code)
+                .setErrorMessage(error.message())
+                .setPartitionId(tp.partition)}.toList.asJava)}
+        val data = new ElectPreferredLeadersResponseData()
+          .setThrottleTimeMs(requestThrottleMs)
+          .setReplicaElectionResults(results.toList.asJava)
+        new ElectPreferredLeadersResponse(data)})
+    }
+    if (!authorize(request.session, Alter, Resource.ClusterResource)) {
+      val error = new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null);
+      val partitionErrors =
+      if (electionRequest.data().topicPartitions() == null) {
+        // Don't leak the set of partitions if the client lack authz
+        Map.empty[TopicPartition, ApiError]
+      } else {
+        partitions.map(partition => partition -> error).toMap
+      }
+      sendResponseCallback(partitionErrors)
+    } else {
+      replicaManager.electPreferredLeaders(controller, partitions, 
sendResponseCallback, electionRequest.data().timeoutMs())
+    }
+  }
+
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
     if (!isAuthorizedClusterAction(request))
       throw new ClusterAuthorizationException(s"Request $request is not 
authorized.")
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index 3fefc7b..ec5a2b9 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -137,6 +137,12 @@ class MetadataCache(brokerId: Int) extends Logging {
     getAllTopics(metadataSnapshot)
   }
 
+  def getAllPartitions(): Set[TopicPartition] = {
+    metadataSnapshot.partitionStates.flatMap { case (topicName, 
partitionsAndStates) =>
+      partitionsAndStates.keys.map(partitionId => new 
TopicPartition(topicName, partitionId.toInt))
+    }.toSet
+  }
+
   private def getAllTopics(snapshot: MetadataSnapshot): Set[String] = {
     snapshot.partitionStates.keySet
   }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5dfb2e6..5e41e35 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -146,6 +146,7 @@ class ReplicaManager(val config: KafkaConfig,
                      val delayedProducePurgatory: 
DelayedOperationPurgatory[DelayedProduce],
                      val delayedFetchPurgatory: 
DelayedOperationPurgatory[DelayedFetch],
                      val delayedDeleteRecordsPurgatory: 
DelayedOperationPurgatory[DelayedDeleteRecords],
+                     val delayedElectPreferredLeaderPurgatory: 
DelayedOperationPurgatory[DelayedElectPreferredLeader],
                      threadNamePrefix: Option[String]) extends Logging with 
KafkaMetricsGroup {
 
   def this(config: KafkaConfig,
@@ -171,6 +172,8 @@ class ReplicaManager(val config: KafkaConfig,
       DelayedOperationPurgatory[DelayedDeleteRecords](
         purgatoryName = "DeleteRecords", brokerId = config.brokerId,
         purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedElectPreferredLeader](
+        purgatoryName = "ElectPreferredLeader", brokerId = config.brokerId),
       threadNamePrefix)
   }
 
@@ -318,6 +321,13 @@ class ReplicaManager(val config: KafkaConfig,
     debug("Request key %s unblocked %d 
DeleteRecordsRequest.".format(key.keyLabel, completed))
   }
 
+  def hasDelayedElectionOperations = 
delayedElectPreferredLeaderPurgatory.delayed != 0
+
+  def tryCompleteElection(key: DelayedOperationKey): Unit = {
+    val completed = delayedElectPreferredLeaderPurgatory.checkAndComplete(key)
+    debug("Request key %s unblocked %d 
ElectPreferredLeader.".format(key.keyLabel, completed))
+  }
+
   def startup() {
     // start ISR expiration thread
     // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 
1.5 before it is removed from ISR
@@ -1476,6 +1486,7 @@ class ReplicaManager(val config: KafkaConfig,
     delayedFetchPurgatory.shutdown()
     delayedProducePurgatory.shutdown()
     delayedDeleteRecordsPurgatory.shutdown()
+    delayedElectPreferredLeaderPurgatory.shutdown()
     if (checkpointHW)
       checkpointHighWatermarks()
     info("Shut down completely")
@@ -1508,4 +1519,30 @@ class ReplicaManager(val config: KafkaConfig,
       tp -> epochEndOffset
     }
   }
+
+  def electPreferredLeaders(controller: KafkaController,
+                            partitions: Set[TopicPartition],
+                            responseCallback: Map[TopicPartition, ApiError] => 
Unit,
+                            requestTimeout: Long): Unit = {
+
+    val deadline = time.milliseconds() + requestTimeout
+
+    def electionCallback(expectedLeaders: Map[TopicPartition, Int],
+                         results: Map[TopicPartition, ApiError]): Unit = {
+      if (expectedLeaders.nonEmpty) {
+        val watchKeys = expectedLeaders.map{
+          case (tp, leader) => new TopicPartitionOperationKey(tp.topic, 
tp.partition)
+        }.toSeq
+        delayedElectPreferredLeaderPurgatory.tryCompleteElseWatch(
+          new DelayedElectPreferredLeader(deadline - time.milliseconds(), 
expectedLeaders, results,
+            this, responseCallback),
+          watchKeys)
+      } else {
+          // There are no partitions actually being elected, so return 
immediately
+          responseCallback(results)
+      }
+    }
+
+    controller.electPreferredLeaders(partitions, electionCallback)
+  }
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 92d1758..5a3278c 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -47,13 +47,14 @@ import org.junit.Assert._
 
 import scala.util.Random
 import scala.collection.JavaConverters._
-import java.lang.{Long => JLong}
 
 import kafka.zk.KafkaZkClient
 
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
 
+import java.lang.{Long => JLong}
+
 /**
  * An integration test of the KafkaAdminClient.
  *
@@ -99,6 +100,8 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
       config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"${listenerName.value}:${securityProtocol.name}")
       config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
       config.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
+      config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
+      config.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false")
       // We set this in order to test that we don't expose sensitive data via 
describe configs. This will already be
       // set for subclasses with security enabled and we don't want to 
overwrite it.
       if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp))
@@ -1198,6 +1201,204 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
     }
   }
 
+  @Test
+  def testElectPreferredLeaders(): Unit = {
+    client = AdminClient.create(createConfig)
+
+    val prefer0 = Seq(0, 1, 2)
+    val prefer1 = Seq(1, 2, 0)
+    val prefer2 = Seq(2, 0, 1)
+
+    val partition1 = new TopicPartition("elect-preferred-leaders-topic-1", 0)
+    TestUtils.createTopic(zkClient, partition1.topic, Map[Int, 
Seq[Int]](partition1.partition -> prefer0), servers)
+
+    val partition2 = new TopicPartition("elect-preferred-leaders-topic-2", 0)
+    TestUtils.createTopic(zkClient, partition2.topic, Map[Int, 
Seq[Int]](partition2.partition -> prefer0), servers)
+
+    def currentLeader(topicPartition: TopicPartition) =
+      
client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic).
+        get.partitions.get(topicPartition.partition).leader.id
+
+    def preferredLeader(topicPartition: TopicPartition) =
+      
client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic).
+        get.partitions.get(topicPartition.partition).replicas.get(0).id
+
+    def waitForLeaderToBecome(topicPartition: TopicPartition, leader: Int) =
+      TestUtils.waitUntilTrue(() => currentLeader(topicPartition) == leader, 
s"Expected leader to become $leader", 10000)
+
+    /** Changes the <i>preferred</i> leader without changing the 
<i>current</i> leader. */
+    def changePreferredLeader(newAssignment: Seq[Int]) = {
+      val preferred = newAssignment.head
+      val prior1 = currentLeader(partition1)
+      val prior2 = currentLeader(partition2)
+
+      var m = Map.empty[TopicPartition, Seq[Int]]
+
+      if (prior1 != preferred)
+        m += partition1 -> newAssignment
+      if (prior2 != preferred)
+        m += partition2 -> newAssignment
+
+      zkClient.createPartitionReassignment(m)
+      TestUtils.waitUntilTrue(
+        () => preferredLeader(partition1) == preferred && 
preferredLeader(partition2) == preferred,
+        s"Expected preferred leader to become $preferred, but is 
${preferredLeader(partition1)} and ${preferredLeader(partition2)}", 10000)
+      // Check the leader hasn't moved
+      assertEquals(prior1, currentLeader(partition1))
+      assertEquals(prior2, currentLeader(partition2))
+    }
+
+    // Check current leaders are 0
+    assertEquals(0, currentLeader(partition1))
+    assertEquals(0, currentLeader(partition2))
+
+    // Noop election
+    var electResult = client.electPreferredLeaders(asList(partition1))
+    electResult.partitionResult(partition1).get()
+    assertEquals(0, currentLeader(partition1))
+
+    // Noop election with null partitions
+    electResult = client.electPreferredLeaders(null)
+    electResult.partitionResult(partition1).get()
+    assertEquals(0, currentLeader(partition1))
+    electResult.partitionResult(partition2).get()
+    assertEquals(0, currentLeader(partition2))
+
+    // Now change the preferred leader to 1
+    changePreferredLeader(prefer1)
+
+    // meaningful election
+    electResult = client.electPreferredLeaders(asList(partition1))
+    assertEquals(Set(partition1).asJava, electResult.partitions.get)
+    electResult.partitionResult(partition1).get()
+    waitForLeaderToBecome(partition1, 1)
+
+    // topic 2 unchanged
+    try {
+      electResult.partitionResult(partition2).get()
+      fail("topic 2 wasn't requested")
+    } catch {
+      case e: ExecutionException =>
+        val cause = e.getCause
+        assertTrue(cause.getClass.getName, 
cause.isInstanceOf[UnknownTopicOrPartitionException])
+        assertEquals("Preferred leader election for partition 
\"elect-preferred-leaders-topic-2-0\" was not attempted",
+          cause.getMessage)
+        assertEquals(0, currentLeader(partition2))
+    }
+
+    // meaningful election with null partitions
+    electResult = client.electPreferredLeaders(null)
+    assertEquals(Set(partition1, partition2), 
electResult.partitions.get.asScala.filterNot(_.topic == "__consumer_offsets"))
+    electResult.partitionResult(partition1).get()
+    waitForLeaderToBecome(partition1, 1)
+    electResult.partitionResult(partition2).get()
+    waitForLeaderToBecome(partition2, 1)
+
+    // unknown topic
+    val unknownPartition = new TopicPartition("topic-does-not-exist", 0)
+    electResult = client.electPreferredLeaders(asList(unknownPartition))
+    assertEquals(Set(unknownPartition).asJava, electResult.partitions.get)
+    try {
+      electResult.partitionResult(unknownPartition).get()
+    } catch {
+      case e: Exception =>
+        val cause = e.getCause
+        assertTrue(cause.isInstanceOf[UnknownTopicOrPartitionException])
+        assertEquals("The partition does not exist.",
+          cause.getMessage)
+        assertEquals(1, currentLeader(partition1))
+        assertEquals(1, currentLeader(partition2))
+    }
+
+    // Now change the preferred leader to 2
+    changePreferredLeader(prefer2)
+
+    // mixed results
+    electResult = client.electPreferredLeaders(asList(unknownPartition, 
partition1))
+    assertEquals(Set(unknownPartition, partition1).asJava, 
electResult.partitions.get)
+    waitForLeaderToBecome(partition1, 2)
+    assertEquals(1, currentLeader(partition2))
+    try {
+      electResult.partitionResult(unknownPartition).get()
+    } catch {
+      case e: Exception =>
+        val cause = e.getCause
+        assertTrue(cause.isInstanceOf[UnknownTopicOrPartitionException])
+        assertEquals("The partition does not exist.",
+          cause.getMessage)
+    }
+
+    // dupe partitions
+    electResult = client.electPreferredLeaders(asList(partition2, partition2))
+    assertEquals(Set(partition2).asJava, electResult.partitions.get)
+    electResult.partitionResult(partition2).get()
+    waitForLeaderToBecome(partition2, 2)
+
+    // Now change the preferred leader to 1
+    changePreferredLeader(prefer1)
+    // but shut it down...
+    servers(1).shutdown()
+    waitUntilTrue (
+      () => {
+        val description = client.describeTopics(Set (partition1.topic(), 
partition2.topic()).asJava).all().get()
+        return !description.asScala.flatMap{
+          case (topic, description) => description.partitions().asScala.map(
+            partition => partition.isr().asScala).flatten
+        }.exists(node => node.id == 1)
+      },
+      "Expect broker 1 to no longer be in any ISR"
+    )
+
+    // ... now what happens if we try to elect the preferred leader and it's 
down?
+    val shortTimeout = new ElectPreferredLeadersOptions().timeoutMs(10000)
+    electResult = client.electPreferredLeaders(asList(partition1), 
shortTimeout)
+    assertEquals(Set(partition1).asJava, electResult.partitions.get)
+    try {
+      electResult.partitionResult(partition1).get()
+      fail()
+    } catch {
+      case e: Exception =>
+        val cause = e.getCause
+        assertTrue(cause.getClass.getName, 
cause.isInstanceOf[LeaderNotAvailableException])
+        assertTrue(s"Wrong message ${cause.getMessage}", 
cause.getMessage.contains(
+          "Failed to elect leader for partition 
elect-preferred-leaders-topic-1-0 under strategy 
PreferredReplicaPartitionLeaderElectionStrategy"))
+    }
+    assertEquals(2, currentLeader(partition1))
+
+    // preferred leader unavailable with null argument
+    electResult = client.electPreferredLeaders(null, shortTimeout)
+    try {
+      electResult.partitions.get()
+      fail()
+    } catch {
+      case e: Exception =>
+        val cause = e.getCause
+        assertTrue(cause.getClass.getName, 
cause.isInstanceOf[LeaderNotAvailableException])
+    }
+    try {
+      electResult.partitionResult(partition1).get()
+      fail()
+    } catch {
+      case e: Exception =>
+        val cause = e.getCause
+        assertTrue(cause.getClass.getName, 
cause.isInstanceOf[LeaderNotAvailableException])
+        assertTrue(s"Wrong message ${cause.getMessage}", 
cause.getMessage.contains(
+          "Failed to elect leader for partition 
elect-preferred-leaders-topic-1-0 under strategy 
PreferredReplicaPartitionLeaderElectionStrategy"))
+    }
+    try {
+      electResult.partitionResult(partition2).get()
+      fail()
+    } catch {
+      case e: Exception =>
+        val cause = e.getCause
+        assertTrue(cause.getClass.getName, 
cause.isInstanceOf[LeaderNotAvailableException])
+        assertTrue(s"Wrong message ${cause.getMessage}", 
cause.getMessage.contains(
+          "Failed to elect leader for partition 
elect-preferred-leaders-topic-2-0 under strategy 
PreferredReplicaPartitionLeaderElectionStrategy"))
+    }
+
+    assertEquals(2, currentLeader(partition1))
+    assertEquals(2, currentLeader(partition2))
+  }
 }
 
 object AdminClientIntegrationTest {
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 4c0459e..ad7fdbb 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -146,7 +146,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse],
       ApiKeys.ALTER_REPLICA_LOG_DIRS -> classOf[AlterReplicaLogDirsResponse],
       ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse],
-      ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse]
+      ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse],
+      ApiKeys.ELECT_PREFERRED_LEADERS -> classOf[ElectPreferredLeadersResponse]
   )
 
   val requestKeyToError = Map[ApiKeys, Nothing => Errors](
@@ -187,7 +188,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.ALTER_REPLICA_LOG_DIRS -> ((resp: AlterReplicaLogDirsResponse) => 
resp.responses.get(tp)),
     ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) =>
       if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error 
else Errors.CLUSTER_AUTHORIZATION_FAILED),
-    ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => 
resp.errors.asScala.find(_._1 == topic).get._2.error)
+    ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => 
resp.errors.asScala.find(_._1 == topic).get._2.error),
+    ApiKeys.ELECT_PREFERRED_LEADERS -> ((resp: ElectPreferredLeadersResponse) 
=>
+      
ElectPreferredLeadersRequest.fromResponseData(resp.data()).get(tp).error())
   )
 
   val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
@@ -225,7 +228,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.DELETE_ACLS -> clusterAlterAcl,
     ApiKeys.ALTER_REPLICA_LOG_DIRS -> clusterAlterAcl,
     ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl,
-    ApiKeys.CREATE_PARTITIONS -> topicAlterAcl
+    ApiKeys.CREATE_PARTITIONS -> topicAlterAcl,
+    ApiKeys.ELECT_PREFERRED_LEADERS -> clusterAlterAcl
   )
 
   @Before
@@ -382,6 +386,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def addOffsetsToTxnRequest = new 
AddOffsetsToTxnRequest.Builder(transactionalId, 1, 1, group).build()
 
+  private def electPreferredLeadersRequest = new 
ElectPreferredLeadersRequest.Builder(
+    ElectPreferredLeadersRequest.toRequestData(Collections.singleton(tp), 
10000)).build()
+
   @Test
   def testAuthorizationWithTopicExisting() {
     val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
@@ -414,9 +421,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
       ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
       ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
-
       // Check StopReplica last since some APIs depend on replica availability
-      ApiKeys.STOP_REPLICA -> stopReplicaRequest
+      ApiKeys.STOP_REPLICA -> stopReplicaRequest,
+      ApiKeys.ELECT_PREFERRED_LEADERS -> electPreferredLeadersRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {
@@ -462,7 +469,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
       ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
       ApiKeys.DELETE_GROUPS -> deleteGroupsRequest,
-      ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest
+      ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest,
+      ApiKeys.ELECT_PREFERRED_LEADERS -> electPreferredLeadersRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {
diff --git 
a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
 
b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
new file mode 100644
index 0000000..824e8fb
--- /dev/null
+++ 
b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths}
+import java.util.Properties
+
+import kafka.admin.PreferredReplicaLeaderElectionCommand
+import kafka.common.{AdminCommandFailedException, TopicAndPartition}
+import kafka.network.RequestChannel
+import kafka.security.auth._
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{Logging, TestUtils, ZkUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{ClusterAuthorizationException, 
PreferredLeaderNotAvailableException, TimeoutException, 
UnknownTopicOrPartitionException}
+import org.apache.kafka.common.network.ListenerName
+import org.junit.Assert._
+import org.junit.{After, Test}
+
+class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness 
with Logging /*with RackAwareTest*/ {
+
+  var servers: Seq[KafkaServer] = Seq()
+
+  @After
+  override def tearDown() {
+    TestUtils.shutdownServers(servers)
+    super.tearDown()
+  }
+
+  private def createTestTopicAndCluster(topicPartition: Map[TopicPartition, 
List[Int]],
+                                        authorizer: Option[String] = None) {
+
+    val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
+    brokerConfigs.foreach(p => p.setProperty("auto.leader.rebalance.enable", 
"false"))
+    authorizer match {
+      case Some(className) =>
+        brokerConfigs.foreach(p => p.setProperty("authorizer.class.name", 
className))
+      case None =>
+    }
+    createTestTopicAndCluster(topicPartition,brokerConfigs)
+  }
+
+  private def createTestTopicAndCluster(partitionsAndAssignments: 
Map[TopicPartition, List[Int]],
+                                        brokerConfigs: Seq[Properties]) {
+    // create brokers
+    servers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
+    // create the topic
+    partitionsAndAssignments.foreach { case (tp, assigment) =>
+      zkClient.createTopicAssignment(tp.topic(),
+      Map(tp -> assigment))
+    }
+    // wait until replica log is created on every broker
+    TestUtils.waitUntilTrue(() => servers.forall(server => 
partitionsAndAssignments.forall(partitionAndAssignment => 
server.getLogManager().getLog(partitionAndAssignment._1).isDefined)),
+      "Replicas for topic test not created")
+  }
+
+  /** Bounce the given targetServer and wait for all servers to get metadata 
for the given partition */
+  private def bounceServer(targetServer: Int, partition: TopicPartition) {
+    debug(s"Shutting down server $targetServer so a non-preferred replica 
becomes leader")
+    servers(targetServer).shutdown()
+    debug(s"Starting server $targetServer now that a non-preferred replica is 
leader")
+    servers(targetServer).startup()
+    TestUtils.waitUntilTrue(() => servers.forall { server =>
+      server.metadataCache.getPartitionInfo(partition.topic(), 
partition.partition()).exists { partitionState =>
+        partitionState.basePartitionState.isr.contains(targetServer)
+      }
+    },
+      s"Replicas for partition $partition not created")
+  }
+
+  private def getController() = {
+    servers.find(p => p.kafkaController.isActive)
+  }
+
+  private def getLeader(topicPartition: TopicPartition) = {
+    servers(0).metadataCache.getPartitionInfo(topicPartition.topic(), 
topicPartition.partition()).get.basePartitionState.leader
+  }
+
+  private def bootstrapServer(broker: Int = 0): String = {
+    val port = 
servers(broker).socketServer.boundPort(ListenerName.normalised("PLAINTEXT"))
+    debug("Server bound to port "+port)
+    s"localhost:$port"
+  }
+
+  val testPartition = new TopicPartition("test", 0)
+  val testPartitionAssignment = List(1, 2, 0)
+  val testPartitionPreferredLeader = testPartitionAssignment.head
+  val testPartitionAndAssignment = Map(testPartition -> 
testPartitionAssignment)
+
+  /** Test the case multiple values are given for --bootstrap-broker */
+  @Test
+  def testMultipleBrokersGiven() {
+    createTestTopicAndCluster(testPartitionAndAssignment)
+    bounceServer(testPartitionPreferredLeader, testPartition)
+    // Check the leader for the partition is not the preferred one
+    assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition))
+    PreferredReplicaLeaderElectionCommand.run(Array(
+      "--bootstrap-server", s"${bootstrapServer(1)},${bootstrapServer(0)}"))
+    // Check the leader for the partition IS the preferred one
+    assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
+  }
+
+  /** Test the case when an invalid broker is given for --bootstrap-broker */
+  @Test
+  def testInvalidBrokerGiven() {
+    try {
+      PreferredReplicaLeaderElectionCommand.run(Array(
+        "--bootstrap-server", "example.com:1234"),
+        timeout = 1000)
+      fail()
+    } catch {
+      case e: AdminCommandFailedException =>
+        assertTrue(e.getCause.isInstanceOf[TimeoutException])
+    }
+  }
+
+  /** Test the case where no partitions are given (=> elect all partitions) */
+  @Test
+  def testNoPartitionsGiven() {
+    createTestTopicAndCluster(testPartitionAndAssignment)
+    bounceServer(testPartitionPreferredLeader, testPartition)
+    // Check the leader for the partition is not the preferred one
+    assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition))
+    PreferredReplicaLeaderElectionCommand.run(Array(
+      "--bootstrap-server", bootstrapServer()))
+    // Check the leader for the partition IS the preferred one
+    assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
+  }
+
+  private def toJsonFile(partitions: scala.collection.Set[TopicPartition]): 
File = {
+    val jsonFile = File.createTempFile("preferredreplicaelection", ".js")
+    jsonFile.deleteOnExit()
+    val jsonString = 
ZkUtils.preferredReplicaLeaderElectionZkData(partitions.map(new 
TopicAndPartition(_)))
+    debug("Using json: "+jsonString)
+    Files.write(Paths.get(jsonFile.getAbsolutePath), 
jsonString.getBytes(StandardCharsets.UTF_8))
+    jsonFile
+  }
+
+  /** Test the case where a list of partitions is given */
+  @Test
+  def testSingletonPartitionGiven() {
+    createTestTopicAndCluster(testPartitionAndAssignment)
+    bounceServer(testPartitionPreferredLeader, testPartition)
+    // Check the leader for the partition is not the preferred one
+    assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition))
+    val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
+    try {
+      PreferredReplicaLeaderElectionCommand.run(Array(
+        "--bootstrap-server", bootstrapServer(),
+        "--path-to-json-file", jsonFile.getAbsolutePath))
+    } finally {
+      jsonFile.delete()
+    }
+    // Check the leader for the partition IS the preferred one
+    assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
+  }
+
+  /** Test the case where a topic does not exist */
+  @Test
+  def testTopicDoesNotExist() {
+    val nonExistentPartition = new TopicPartition("does.not.exist", 0)
+    val nonExistentPartitionAssignment = List(1, 2, 0)
+    val nonExistentPartitionAndAssignment = Map(nonExistentPartition -> 
nonExistentPartitionAssignment)
+
+    createTestTopicAndCluster(testPartitionAndAssignment)
+    val jsonFile = toJsonFile(nonExistentPartitionAndAssignment.keySet)
+    try {
+      PreferredReplicaLeaderElectionCommand.run(Array(
+        "--bootstrap-server", bootstrapServer(),
+        "--path-to-json-file", jsonFile.getAbsolutePath))
+    } catch {
+      case e: AdminCommandFailedException =>
+        val suppressed = e.getSuppressed()(0)
+        assertTrue(suppressed.isInstanceOf[UnknownTopicOrPartitionException])
+      case e: Throwable =>
+        e.printStackTrace()
+        throw e
+    } finally {
+      jsonFile.delete()
+    }
+  }
+
+  /** Test the case where several partitions are given */
+  @Test
+  def testMultiplePartitionsSameAssignment() {
+    val testPartitionA = new TopicPartition("testA", 0)
+    val testPartitionB = new TopicPartition("testB", 0)
+    val testPartitionAssignment = List(1, 2, 0)
+    val testPartitionPreferredLeader = testPartitionAssignment.head
+    val testPartitionAndAssignment = Map(testPartitionA -> 
testPartitionAssignment, testPartitionB -> testPartitionAssignment)
+
+    createTestTopicAndCluster(testPartitionAndAssignment)
+    bounceServer(testPartitionPreferredLeader, testPartitionA)
+    // Check the leader for the partition is not the preferred one
+    assertNotEquals(testPartitionPreferredLeader, getLeader(testPartitionA))
+    assertNotEquals(testPartitionPreferredLeader, getLeader(testPartitionB))
+    val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
+    try {
+      PreferredReplicaLeaderElectionCommand.run(Array(
+        "--bootstrap-server", bootstrapServer(),
+        "--path-to-json-file", jsonFile.getAbsolutePath))
+    } finally {
+      jsonFile.delete()
+    }
+    // Check the leader for the partition IS the preferred one
+    assertEquals(testPartitionPreferredLeader, getLeader(testPartitionA))
+    assertEquals(testPartitionPreferredLeader, getLeader(testPartitionB))
+  }
+
+  /** What happens when the preferred replica is already the leader? */
+  @Test
+  def testNoopElection() {
+    createTestTopicAndCluster(testPartitionAndAssignment)
+    // Don't bounce the server. Doublec heck the leader for the partition is 
the preferred one
+    assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
+    val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
+    try {
+      // Now do the election, even though the preferred replica is *already* 
the leader
+      PreferredReplicaLeaderElectionCommand.run(Array(
+        "--bootstrap-server", bootstrapServer(),
+        "--path-to-json-file", jsonFile.getAbsolutePath))
+      // Check the leader for the partition still is the preferred one
+      assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
+    } finally {
+      jsonFile.delete()
+    }
+  }
+
+  /** What happens if the preferred replica is offline? */
+  @Test
+  def testWithOfflinePreferredReplica() {
+    createTestTopicAndCluster(testPartitionAndAssignment)
+    bounceServer(testPartitionPreferredLeader, testPartition)
+    // Check the leader for the partition is not the preferred one
+    val leader = getLeader(testPartition)
+    assertNotEquals(testPartitionPreferredLeader, leader)
+    // Now kill the preferred one
+    servers(testPartitionPreferredLeader).shutdown()
+    // Now try to elect the preferred one
+    val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
+    try {
+      PreferredReplicaLeaderElectionCommand.run(Array(
+        "--bootstrap-server", bootstrapServer(),
+        "--path-to-json-file", jsonFile.getAbsolutePath))
+      fail();
+    } catch {
+      case e: AdminCommandFailedException =>
+        assertEquals("1 preferred replica(s) could not be elected", 
e.getMessage)
+        val suppressed = e.getSuppressed()(0)
+        
assertTrue(suppressed.isInstanceOf[PreferredLeaderNotAvailableException])
+        assertTrue(suppressed.getMessage, 
suppressed.getMessage.contains("Failed to elect leader for partition test-0 
under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
+        // Check we still have the same leader
+        assertEquals(leader, getLeader(testPartition))
+    } finally {
+      jsonFile.delete()
+    }
+  }
+
+  /** What happens if the controller gets killed just before an election? */
+  @Test
+  def testTimeout() {
+    createTestTopicAndCluster(testPartitionAndAssignment)
+    bounceServer(testPartitionPreferredLeader, testPartition)
+    // Check the leader for the partition is not the preferred one
+    val leader = getLeader(testPartition)
+    assertNotEquals(testPartitionPreferredLeader, leader)
+    // Now kill the controller just before we trigger the election
+    val controller = getController().get.config.brokerId
+    servers(controller).shutdown()
+    val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
+    try {
+      PreferredReplicaLeaderElectionCommand.run(Array(
+        "--bootstrap-server", bootstrapServer(controller),
+        "--path-to-json-file", jsonFile.getAbsolutePath),
+        timeout = 2000)
+      fail();
+    } catch {
+      case e: AdminCommandFailedException =>
+        assertEquals("1 preferred replica(s) could not be elected", 
e.getMessage)
+        assertTrue(e.getSuppressed()(0).getMessage.contains("Timed out waiting 
for a node assignment"))
+        // Check we still have the same leader
+        assertEquals(leader, getLeader(testPartition))
+    } finally {
+      jsonFile.delete()
+    }
+  }
+
+  /** Test the case where client is not authorized */
+  @Test
+  def testAuthzFailure() {
+    createTestTopicAndCluster(testPartitionAndAssignment, 
Some(classOf[PreferredReplicaLeaderElectionCommandTestAuthorizer].getName))
+    bounceServer(testPartitionPreferredLeader, testPartition)
+    // Check the leader for the partition is not the preferred one
+    val leader = getLeader(testPartition)
+    assertNotEquals(testPartitionPreferredLeader, leader)
+    // Check the leader for the partition is not the preferred one
+    assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition))
+    val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
+    try {
+      PreferredReplicaLeaderElectionCommand.run(Array(
+        "--bootstrap-server", bootstrapServer(),
+        "--path-to-json-file", jsonFile.getAbsolutePath))
+      fail();
+    } catch {
+      case e: AdminCommandFailedException =>
+        assertEquals("1 preferred replica(s) could not be elected", 
e.getMessage)
+        
assertTrue(e.getSuppressed()(0).isInstanceOf[ClusterAuthorizationException])
+        // Check we still have the same leader
+        assertEquals(leader, getLeader(testPartition))
+    } finally {
+      jsonFile.delete()
+    }
+  }
+
+}
+
+class PreferredReplicaLeaderElectionCommandTestAuthorizer extends 
SimpleAclAuthorizer {
+  override def authorize(session: RequestChannel.Session, operation: 
Operation, resource: Resource): Boolean =
+    operation != Alter || resource.resourceType != Cluster
+}
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index d4dcd9f..d5becea 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -158,7 +158,7 @@ object AbstractCoordinatorConcurrencyTest {
   }
 
   class TestReplicaManager extends ReplicaManager(
-    null, null, null, null, null, null, null, null, null, null, null, null, 
null, null, None) {
+    null, null, null, null, null, null, null, null, null, null, null, null, 
null, null, null, None) {
 
     var producePurgatory: DelayedOperationPurgatory[DelayedProduce] = _
     var watchKeys: mutable.Set[TopicPartitionOperationKey] = _
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 9b4210e..a3ecb07 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -524,7 +524,7 @@ class KafkaApisTest {
     channel.buffer.getInt() // read the size
     ResponseHeader.parse(channel.buffer)
     val struct = api.responseSchema(request.version).read(channel.buffer)
-    AbstractResponse.parseResponse(api, struct)
+    AbstractResponse.parseResponse(api, struct, request.version)
   }
 
   private def expectNoThrottling(): Capture[RequestChannel.Response] = {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 1308820..08aa624 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -657,6 +657,8 @@ class ReplicaManagerTest {
       purgatoryName = "Fetch", timer, reaperEnabled = false)
     val mockDeleteRecordsPurgatory = new 
DelayedOperationPurgatory[DelayedDeleteRecords](
       purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
+    val mockElectPreferredLeaderPurgatory = new 
DelayedOperationPurgatory[DelayedElectPreferredLeader](
+      purgatoryName = "ElectPreferredLeader", timer, reaperEnabled = false)
 
     // Mock network client to show leader offset of 5
     val quota = QuotaFactory.instantiate(config, metrics, time, "")
@@ -665,7 +667,7 @@ class ReplicaManagerTest {
     val replicaManager = new ReplicaManager(config, metrics, time, 
kafkaZkClient, mockScheduler, mockLogMgr,
       new AtomicBoolean(false), quota, mockBrokerTopicStats,
       metadataCache, mockLogDirFailureChannel, mockProducePurgatory, 
mockFetchPurgatory,
-      mockDeleteRecordsPurgatory, Option(this.getClass.getName)) {
+      mockDeleteRecordsPurgatory, mockElectPreferredLeaderPurgatory, 
Option(this.getClass.getName)) {
 
       override protected def createReplicaFetcherManager(metrics: Metrics,
                                                      time: Time,
@@ -815,11 +817,13 @@ class ReplicaManagerTest {
       purgatoryName = "Fetch", timer, reaperEnabled = false)
     val mockDeleteRecordsPurgatory = new 
DelayedOperationPurgatory[DelayedDeleteRecords](
       purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
+    val mockDelayedElectPreferredLeaderPurgatory = new 
DelayedOperationPurgatory[DelayedElectPreferredLeader](
+      purgatoryName = "DelayedElectPreferredLeader", timer, reaperEnabled = 
false)
 
     new ReplicaManager(config, metrics, time, kafkaZkClient, new 
MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, 
time, ""), new BrokerTopicStats,
       metadataCache, new LogDirFailureChannel(config.logDirs.size), 
mockProducePurgatory, mockFetchPurgatory,
-      mockDeleteRecordsPurgatory, Option(this.getClass.getName))
+      mockDeleteRecordsPurgatory, mockDelayedElectPreferredLeaderPurgatory, 
Option(this.getClass.getName))
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index c9e4e78..3176f72 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -24,6 +24,7 @@ import kafka.security.auth._
 import kafka.utils.TestUtils
 import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, 
AclPermissionType}
 import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.message.ElectPreferredLeadersRequestData
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
@@ -359,6 +360,15 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.DELETE_GROUPS =>
           new DeleteGroupsRequest.Builder(Collections.singleton("test-group"))
 
+        case ApiKeys.ELECT_PREFERRED_LEADERS =>
+          val partition = new 
ElectPreferredLeadersRequestData.TopicPartitions()
+            .setPartitionId(Collections.singletonList(0))
+            .setTopic("my_topic")
+          new ElectPreferredLeadersRequest.Builder(
+            new ElectPreferredLeadersRequestData()
+                .setTimeoutMs(0)
+                .setTopicPartitions(Collections.singletonList(partition)))
+
         case _ =>
           throw new IllegalArgumentException("Unsupported API key " + apiKey)
     }
@@ -450,6 +460,7 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.RENEW_DELEGATION_TOKEN => new 
RenewDelegationTokenResponse(response).throttleTimeMs
       case ApiKeys.DELETE_GROUPS => new 
DeleteGroupsResponse(response).throttleTimeMs
       case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new 
OffsetsForLeaderEpochResponse(response).throttleTimeMs
+      case ApiKeys.ELECT_PREFERRED_LEADERS => new 
ElectPreferredLeadersResponse(response, 0).throttleTimeMs
       case requestId => throw new IllegalArgumentException(s"No throttle time 
for $requestId")
     }
   }

Reply via email to