This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 5129ab5 KAFKA-8345: KIP-455: Admin API changes (Part 2) (#7120) 5129ab5 is described below commit 5129ab53ee9a2e46a22a13b1d5300ee139c4999f Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com> AuthorDate: Wed Aug 14 17:25:17 2019 +0100 KAFKA-8345: KIP-455: Admin API changes (Part 2) (#7120) Add the AlterPartitionReassignments and ListPartitionReassignments APIs. Also remove an unused methodlength suppression for KafkaAdminClient. Reviewers: Colin P. McCabe <cmcc...@apache.org>, Viktor Somogyi <viktorsomo...@gmail.com> --- checkstyle/suppressions.xml | 2 +- .../java/org/apache/kafka/clients/admin/Admin.java | 115 ++++++++++ .../admin/AlterPartitionReassignmentsOptions.java | 31 +++ .../admin/AlterPartitionReassignmentsResult.java | 59 +++++ .../kafka/clients/admin/KafkaAdminClient.java | 250 +++++++++++++++++++++ .../admin/ListPartitionReassignmentsOptions.java | 29 +++ .../admin/ListPartitionReassignmentsResult.java | 43 ++++ .../clients/admin/NewPartitionReassignment.java | 44 ++++ .../kafka/clients/admin/PartitionReassignment.java | 60 +++++ .../AlterPartitionReassignmentsResponse.java | 2 +- .../ListPartitionReassignmentsRequest.java | 21 +- .../ListPartitionReassignmentsResponse.java | 2 +- .../message/ListPartitionReassignmentsRequest.json | 2 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 215 ++++++++++++++++++ .../kafka/clients/admin/MockAdminClient.java | 12 + .../kafka/common/requests/RequestResponseTest.java | 6 +- 16 files changed, 878 insertions(+), 15 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 0bb2193..475b9b0 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -52,7 +52,7 @@ files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|SchemaGenerator|AbstractCoordinator).java"/> <suppress checks="JavaNCSS" - files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java|SenderTest.java"/> + files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java|SenderTest.java|KafkaAdminClient.java"/> <suppress checks="NPathComplexity" files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer).java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index c1be7a7..8fdd21b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -21,6 +21,7 @@ import java.time.Duration; import java.util.Collection; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -930,6 +931,120 @@ public interface Admin extends AutoCloseable { Set<TopicPartition> partitions, ElectLeadersOptions options); + + /** + * Change the reassignments for one or more partitions. + * Providing an empty Optional (e.g via {@link Optional#empty()}) will <bold>revert</bold> the reassignment for the associated partition. + * + * This is a convenience method for {@link #alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)} + * with default options. See the overload for more details. + */ + default AlterPartitionReassignmentsResult alterPartitionReassignments( + Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) { + return alterPartitionReassignments(reassignments, new AlterPartitionReassignmentsOptions()); + } + + /** + * Change the reassignments for one or more partitions. + * Providing an empty Optional (e.g via {@link Optional#empty()}) will <bold>revert</bold> the reassignment for the associated partition. + * + * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from + * the returned {@code AlterPartitionReassignmentsResult}:</p> + * <ul> + * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException} + * If the authenticated user didn't have alter access to the cluster.</li> + * <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException} + * If the topic or partition does not exist within the cluster.</li> + * <li>{@link org.apache.kafka.common.errors.TimeoutException} + * if the request timed out before the controller could record the new assignments.</li> + * <li>{@link org.apache.kafka.common.errors.InvalidReplicaAssignmentException} + * If the specified assignment was not valid.</li> + * <li>{@link org.apache.kafka.common.errors.NoReassignmentInProgressException} + * If there was an attempt to cancel a reassignment for a partition which was not being reassigned.</li> + * </ul> + * + * @param reassignments The reassignments to add, modify, or remove. + * @param options The options to use. + * @return The result. + */ + AlterPartitionReassignmentsResult alterPartitionReassignments( + Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments, + AlterPartitionReassignmentsOptions options); + + + /** + * List all of the current partition reassignments + * + * This is a convenience method for {@link #listPartitionReassignments(ListPartitionReassignmentsOptions)} + * with default options. See the overload for more details. + */ + default ListPartitionReassignmentsResult listPartitionReassignments() { + return listPartitionReassignments(new ListPartitionReassignmentsOptions()); + } + + /** + * List the current reassignments for the given partitions + * + * This is a convenience method for {@link #listPartitionReassignments(Set, ListPartitionReassignmentsOptions)} + * with default options. See the overload for more details. + */ + default ListPartitionReassignmentsResult listPartitionReassignments(Set<TopicPartition> partitions) { + return listPartitionReassignments(partitions, new ListPartitionReassignmentsOptions()); + } + + /** + * List the current reassignments for the given partitions + * + * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from + * the returned {@code ListPartitionReassignmentsResult}:</p> + * <ul> + * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException} + * If the authenticated user doesn't have alter access to the cluster.</li> + * <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException} + * If a given topic or partition does not exist.</li> + * <li>{@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the controller could list the current reassignments.</li> + * </ul> + * + * @param partitions The topic partitions to list reassignments for. + * @param options The options to use. + * @return The result. + */ + default ListPartitionReassignmentsResult listPartitionReassignments( + Set<TopicPartition> partitions, + ListPartitionReassignmentsOptions options) { + return listPartitionReassignments(Optional.of(partitions), options); + } + + /** + * List all of the current partition reassignments + * + * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from + * the returned {@code ListPartitionReassignmentsResult}:</p> + * <ul> + * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException} + * If the authenticated user doesn't have alter access to the cluster.</li> + * <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException} + * If a given topic or partition does not exist.</li> + * <li>{@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the controller could list the current reassignments.</li> + * </ul> + * + * @param options The options to use. + * @return The result. + */ + default ListPartitionReassignmentsResult listPartitionReassignments(ListPartitionReassignmentsOptions options) { + return listPartitionReassignments(Optional.empty(), options); + } + + /** + * @param partitions the partitions we want to get reassignment for, or an empty optional if we want to get the reassignments for all partitions in the cluster + * @param options The options to use. + * @return The result. + */ + ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> partitions, + ListPartitionReassignmentsOptions options); + /** * Get the metrics kept by the adminClient */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java new file mode 100644 index 0000000..bee9c70 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; + +/** + * Options for {@link AdminClient#alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)} + * + * The API of this class is evolving. See {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class AlterPartitionReassignmentsOptions extends AbstractOptions<AlterPartitionReassignmentsOptions> { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsResult.java new file mode 100644 index 0000000..2009ab5 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsResult.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; + +/** + * The result of {@link AdminClient#alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}. + * + * The API of this class is evolving. See {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class AlterPartitionReassignmentsResult { + private final Map<TopicPartition, KafkaFuture<Void>> futures; + + AlterPartitionReassignmentsResult(Map<TopicPartition, KafkaFuture<Void>> futures) { + this.futures = futures; + } + + /** + * Return a map from partitions to futures which can be used to check the status of the reassignment. + * + * Possible error codes: + * + * INVALID_REPLICA_ASSIGNMENT (39) - if the specified replica assignment was not valid -- for example, if it included negative numbers, repeated numbers, or specified a broker ID that the controller was not aware of. + * NO_REASSIGNMENT_IN_PROGRESS (85) - if the request wants to cancel reassignments but none exist + * UNKNOWN (-1) + * + */ + public Map<TopicPartition, KafkaFuture<Void>> values() { + return futures; + } + + /** + * Return a future which succeeds only if all the reassignments were successfully initiated. + */ + public KafkaFuture<Void> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 7bcad41..936dc65 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -61,6 +61,8 @@ import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; +import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic; import org.apache.kafka.common.message.CreateDelegationTokenRequestData; import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; import org.apache.kafka.common.message.CreateDelegationTokenResponseData; @@ -81,6 +83,7 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.Altera import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.MetadataRequestData; import org.apache.kafka.common.message.RenewDelegationTokenRequestData; import org.apache.kafka.common.metrics.JmxReporter; @@ -95,6 +98,8 @@ import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.AlterConfigsRequest; import org.apache.kafka.common.requests.AlterConfigsResponse; +import org.apache.kafka.common.requests.AlterPartitionReassignmentsRequest; +import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse; import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest; import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse; import org.apache.kafka.common.requests.ApiError; @@ -140,6 +145,8 @@ import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest; import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; import org.apache.kafka.common.requests.ListGroupsRequest; import org.apache.kafka.common.requests.ListGroupsResponse; +import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest; +import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.OffsetFetchRequest; @@ -170,7 +177,9 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.TreeMap; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -179,6 +188,12 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition; +import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse; +import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse; +import static org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics; +import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment; +import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment; import static org.apache.kafka.common.requests.MetadataRequest.convertToMetadataRequestTopic; import static org.apache.kafka.common.utils.Utils.closeQuietly; @@ -3082,6 +3097,241 @@ public class KafkaAdminClient extends AdminClient { return new ElectLeadersResult(electionFuture); } + @Override + public AlterPartitionReassignmentsResult alterPartitionReassignments( + Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments, + AlterPartitionReassignmentsOptions options) { + final Map<TopicPartition, KafkaFutureImpl<Void>> futures = new HashMap<>(); + final Map<String, Map<Integer, Optional<NewPartitionReassignment>>> topicsToReassignments = new TreeMap<>(); + for (Map.Entry<TopicPartition, Optional<NewPartitionReassignment>> entry : reassignments.entrySet()) { + String topic = entry.getKey().topic(); + int partition = entry.getKey().partition(); + TopicPartition topicPartition = new TopicPartition(topic, partition); + Optional<NewPartitionReassignment> reassignment = entry.getValue(); + KafkaFutureImpl<Void> future = new KafkaFutureImpl<>(); + futures.put(topicPartition, future); + + if (topicNameIsUnrepresentable(topic)) { + future.completeExceptionally(new InvalidTopicException("The given topic name '" + + topic + "' cannot be represented in a request.")); + } else if (topicPartition.partition() < 0) { + future.completeExceptionally(new InvalidTopicException("The given partition index " + + topicPartition.partition() + " is not valid.")); + } else { + Map<Integer, Optional<NewPartitionReassignment>> partitionReassignments = + topicsToReassignments.get(topicPartition.topic()); + if (partitionReassignments == null) { + partitionReassignments = new TreeMap<>(); + topicsToReassignments.put(topic, partitionReassignments); + } + + partitionReassignments.put(partition, reassignment); + } + } + + final long now = time.milliseconds(); + Call call = new Call("alterPartitionReassignments", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + + @Override + public AbstractRequest.Builder createRequest(int timeoutMs) { + AlterPartitionReassignmentsRequestData data = + new AlterPartitionReassignmentsRequestData(); + for (Map.Entry<String, Map<Integer, Optional<NewPartitionReassignment>>> entry : + topicsToReassignments.entrySet()) { + String topicName = entry.getKey(); + Map<Integer, Optional<NewPartitionReassignment>> partitionsToReassignments = entry.getValue(); + + List<ReassignablePartition> reassignablePartitions = new ArrayList<>(); + for (Map.Entry<Integer, Optional<NewPartitionReassignment>> partitionEntry : + partitionsToReassignments.entrySet()) { + int partitionIndex = partitionEntry.getKey(); + Optional<NewPartitionReassignment> reassignment = partitionEntry.getValue(); + + ReassignablePartition reassignablePartition = new ReassignablePartition() + .setPartitionIndex(partitionIndex) + .setReplicas(reassignment.map(NewPartitionReassignment::targetBrokers).orElse(null)); + reassignablePartitions.add(reassignablePartition); + } + + ReassignableTopic reassignableTopic = new ReassignableTopic() + .setName(topicName) + .setPartitions(reassignablePartitions); + data.topics().add(reassignableTopic); + } + data.setTimeoutMs(timeoutMs); + return new AlterPartitionReassignmentsRequest.Builder(data); + } + + @Override + public void handleResponse(AbstractResponse abstractResponse) { + AlterPartitionReassignmentsResponse response = (AlterPartitionReassignmentsResponse) abstractResponse; + Map<TopicPartition, ApiException> errors = new HashMap<>(); + int receivedResponsesCount = 0; + + Errors topLevelError = Errors.forCode(response.data().errorCode()); + switch (topLevelError) { + case NONE: + receivedResponsesCount += validateTopicResponses(response.data().responses(), errors); + break; + case NOT_CONTROLLER: + handleNotControllerError(topLevelError); + break; + default: + for (ReassignableTopicResponse topicResponse : response.data().responses()) { + String topicName = topicResponse.name(); + for (ReassignablePartitionResponse partition : topicResponse.partitions()) { + errors.put( + new TopicPartition(topicName, partition.partitionIndex()), + new ApiError(topLevelError, topLevelError.message()).exception() + ); + receivedResponsesCount += 1; + } + } + break; + } + + assertResponseCountMatch(errors, receivedResponsesCount); + for (Map.Entry<TopicPartition, ApiException> entry : errors.entrySet()) { + ApiException exception = entry.getValue(); + if (exception == null) + futures.get(entry.getKey()).complete(null); + else + futures.get(entry.getKey()).completeExceptionally(exception); + } + } + + private void assertResponseCountMatch(Map<TopicPartition, ApiException> errors, int receivedResponsesCount) { + int expectedResponsesCount = topicsToReassignments.values().stream().mapToInt(Map::size).sum(); + if (errors.values().stream().noneMatch(Objects::nonNull) && receivedResponsesCount != expectedResponsesCount) { + String quantifier = receivedResponsesCount > expectedResponsesCount ? "many" : "less"; + throw new UnknownServerException("The server returned too " + quantifier + " results." + + "Expected " + expectedResponsesCount + " but received " + receivedResponsesCount); + } + } + + private int validateTopicResponses(List<ReassignableTopicResponse> topicResponses, + Map<TopicPartition, ApiException> errors) { + int receivedResponsesCount = 0; + + for (ReassignableTopicResponse topicResponse : topicResponses) { + String topicName = topicResponse.name(); + for (ReassignablePartitionResponse partResponse : topicResponse.partitions()) { + Errors partitionError = Errors.forCode(partResponse.errorCode()); + + TopicPartition tp = new TopicPartition(topicName, partResponse.partitionIndex()); + if (partitionError == Errors.NONE) { + errors.put(tp, null); + } else { + errors.put(tp, new ApiError(partitionError, partResponse.errorMessage()).exception()); + } + receivedResponsesCount += 1; + } + } + + return receivedResponsesCount; + } + + @Override + void handleFailure(Throwable throwable) { + for (KafkaFutureImpl<Void> future : futures.values()) { + future.completeExceptionally(throwable); + } + } + }; + if (!topicsToReassignments.isEmpty()) { + runnable.call(call, now); + } + return new AlterPartitionReassignmentsResult(new HashMap<>(futures)); + } + + @Override + public ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> partitions, + ListPartitionReassignmentsOptions options) { + final KafkaFutureImpl<Map<TopicPartition, PartitionReassignment>> partitionReassignmentsFuture = new KafkaFutureImpl<>(); + if (partitions.isPresent()) { + for (TopicPartition tp : partitions.get()) { + String topic = tp.topic(); + int partition = tp.partition(); + if (topicNameIsUnrepresentable(topic)) { + partitionReassignmentsFuture.completeExceptionally(new InvalidTopicException("The given topic name '" + + topic + "' cannot be represented in a request.")); + } else if (partition < 0) { + partitionReassignmentsFuture.completeExceptionally(new InvalidTopicException("The given partition index " + + partition + " is not valid.")); + } + if (partitionReassignmentsFuture.isCompletedExceptionally()) + return new ListPartitionReassignmentsResult(partitionReassignmentsFuture); + } + } + final long now = time.milliseconds(); + runnable.call(new Call("listPartitionReassignments", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + ListPartitionReassignmentsRequestData listData = new ListPartitionReassignmentsRequestData(); + listData.setTimeoutMs(timeoutMs); + + if (partitions.isPresent()) { + Map<String, ListPartitionReassignmentsTopics> reassignmentTopicByTopicName = new HashMap<>(); + + for (TopicPartition tp : partitions.get()) { + if (!reassignmentTopicByTopicName.containsKey(tp.topic())) + reassignmentTopicByTopicName.put(tp.topic(), new ListPartitionReassignmentsTopics().setName(tp.topic())); + + reassignmentTopicByTopicName.get(tp.topic()).partitionIndexes().add(tp.partition()); + } + + listData.setTopics(new ArrayList<>(reassignmentTopicByTopicName.values())); + } + return new ListPartitionReassignmentsRequest.Builder(listData); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + ListPartitionReassignmentsResponse response = (ListPartitionReassignmentsResponse) abstractResponse; + Errors error = Errors.forCode(response.data().errorCode()); + switch (error) { + case NONE: + break; + case NOT_CONTROLLER: + handleNotControllerError(error); + break; + default: + partitionReassignmentsFuture.completeExceptionally(new ApiError(error, response.data().errorMessage()).exception()); + break; + } + Map<TopicPartition, PartitionReassignment> reassignmentMap = new HashMap<>(); + + for (OngoingTopicReassignment topicReassignment : response.data().topics()) { + String topicName = topicReassignment.name(); + for (OngoingPartitionReassignment partitionReassignment : topicReassignment.partitions()) { + reassignmentMap.put( + new TopicPartition(topicName, partitionReassignment.partitionIndex()), + new PartitionReassignment(partitionReassignment.replicas(), partitionReassignment.addingReplicas(), partitionReassignment.removingReplicas()) + ); + } + } + + partitionReassignmentsFuture.complete(reassignmentMap); + } + + @Override + void handleFailure(Throwable throwable) { + partitionReassignmentsFuture.completeExceptionally(throwable); + } + }, now); + + return new ListPartitionReassignmentsResult(partitionReassignmentsFuture); + } + + private void handleNotControllerError(Errors error) throws ApiException { + metadataManager.clearController(); + metadataManager.requestUpdate(); + throw error.exception(); + } + /** * Returns a boolean indicating whether the resource needs to go to a specific node */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsOptions.java new file mode 100644 index 0000000..7dcc7a6 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsOptions.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Options for {@link AdminClient#listPartitionReassignments(ListPartitionReassignmentsOptions)} + * + * The API of this class is evolving. See {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class ListPartitionReassignmentsOptions extends AbstractOptions<ListPartitionReassignmentsOptions> { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsResult.java new file mode 100644 index 0000000..3d7b14c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; + +import java.util.Map; + +/** + * The result of {@link AdminClient#listPartitionReassignments(ListPartitionReassignmentsOptions)}. + * + * The API of this class is evolving. See {@link AdminClient} for details. + */ +public class ListPartitionReassignmentsResult { + private final KafkaFuture<Map<TopicPartition, PartitionReassignment>> future; + + public ListPartitionReassignmentsResult(KafkaFuture<Map<TopicPartition, PartitionReassignment>> reassignments) { + this.future = reassignments; + } + + /** + * Return a future which yields a map containing each partition's reassignments + */ + public KafkaFuture<Map<TopicPartition, PartitionReassignment>> reassignments() { + return future; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java new file mode 100644 index 0000000..8856470 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * A new partition reassignment, which can be applied via {@link AdminClient#alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}. + */ +public class NewPartitionReassignment { + private final List<Integer> targetBrokers; + + public static Optional<NewPartitionReassignment> of(Integer... brokers) { + return Optional.of(new NewPartitionReassignment(Arrays.asList(brokers))); + } + + public NewPartitionReassignment(List<Integer> targetBrokers) { + this.targetBrokers = Collections.unmodifiableList(new ArrayList<>(targetBrokers)); + } + + public List<Integer> targetBrokers() { + return targetBrokers; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/PartitionReassignment.java b/clients/src/main/java/org/apache/kafka/clients/admin/PartitionReassignment.java new file mode 100644 index 0000000..cc3306e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/PartitionReassignment.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.Collections; +import java.util.List; + +/** + * A partition reassignment, which has been listed via {@link AdminClient#listPartitionReassignments()}. + */ +public class PartitionReassignment { + + private final List<Integer> replicas; + private final List<Integer> addingReplicas; + private final List<Integer> removingReplicas; + + public PartitionReassignment(List<Integer> replicas, List<Integer> addingReplicas, List<Integer> removingReplicas) { + this.replicas = Collections.unmodifiableList(replicas); + this.addingReplicas = Collections.unmodifiableList(addingReplicas); + this.removingReplicas = Collections.unmodifiableList(removingReplicas); + } + + /** + * The brokers which this partition currently resides on. + */ + public List<Integer> replicas() { + return replicas; + } + + /** + * The brokers that we are adding this partition to as part of a reassignment. + * A subset of replicas. + */ + public List<Integer> addingReplicas() { + return addingReplicas; + } + + /** + * The brokers that we are removing this partition from as part of a reassignment. + * A subset of replicas. + */ + public List<Integer> removingReplicas() { + return removingReplicas; + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java index db1cfab..ef235cd 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java @@ -36,7 +36,7 @@ public class AlterPartitionReassignmentsResponse extends AbstractResponse { this(struct, ApiKeys.ALTER_PARTITION_REASSIGNMENTS.latestVersion()); } - AlterPartitionReassignmentsResponse(AlterPartitionReassignmentsResponseData data) { + public AlterPartitionReassignmentsResponse(AlterPartitionReassignmentsResponseData data) { this.data = data; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java index 471147b..0aa5e55 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java @@ -24,9 +24,11 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; +import static org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics; public class ListPartitionReassignmentsRequest extends AbstractRequest { @@ -86,14 +88,17 @@ public class ListPartitionReassignmentsRequest extends AbstractRequest { public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { ApiError apiError = ApiError.fromThrowable(e); - List<OngoingTopicReassignment> ongoingTopicReassignments = data.topics().stream().map(topic -> - new OngoingTopicReassignment() - .setName(topic.name()) - .setPartitions(topic.partitionIndexes().stream().map(partitionIndex -> - new OngoingPartitionReassignment().setPartitionIndex(partitionIndex)).collect(Collectors.toList()) - ) - ).collect(Collectors.toList()); - + List<OngoingTopicReassignment> ongoingTopicReassignments = new ArrayList<>(); + if (data.topics() != null) { + for (ListPartitionReassignmentsTopics topic : data.topics()) { + ongoingTopicReassignments.add( + new OngoingTopicReassignment() + .setName(topic.name()) + .setPartitions(topic.partitionIndexes().stream().map(partitionIndex -> + new OngoingPartitionReassignment().setPartitionIndex(partitionIndex)).collect(Collectors.toList())) + ); + } + } ListPartitionReassignmentsResponseData responseData = new ListPartitionReassignmentsResponseData() .setTopics(ongoingTopicReassignments) .setErrorCode(apiError.error().code()) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java index 9513e88..0d53e4d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java @@ -33,7 +33,7 @@ public class ListPartitionReassignmentsResponse extends AbstractResponse { this(struct, ApiKeys.LIST_PARTITION_REASSIGNMENTS.latestVersion()); } - ListPartitionReassignmentsResponse(ListPartitionReassignmentsResponseData responseData) { + public ListPartitionReassignmentsResponse(ListPartitionReassignmentsResponseData responseData) { this.data = responseData; } diff --git a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json index d0ebf8b..ac871b2 100644 --- a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json +++ b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json @@ -21,7 +21,7 @@ "fields": [ { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000", "about": "The time in ms to wait for the request to complete." }, - { "name": "Topics", "type": "[]ListPartitionReassignmentsTopics", "versions": "0+", "nullableVersions": "0+", + { "name": "Topics", "type": "[]ListPartitionReassignmentsTopics", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The topics to list partition reassignments for, or null to list everything.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name" }, diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 7ec7c24..ff9e272 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -52,6 +52,7 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicDeletionDisabledException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; import org.apache.kafka.common.message.CreateTopicsResponseData; import org.apache.kafka.common.message.DeleteGroupsResponseData; @@ -66,7 +67,9 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionR import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; import org.apache.kafka.common.requests.CreateAclsResponse; @@ -87,6 +90,7 @@ import org.apache.kafka.common.requests.ElectLeadersResponse; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; import org.apache.kafka.common.requests.ListGroupsResponse; +import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.OffsetFetchResponse; @@ -119,11 +123,16 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; +import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse; +import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse; +import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment; +import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -1561,6 +1570,212 @@ public class KafkaAdminClientTest { } } + @Test + public void testAlterPartitionReassignments() throws Exception { + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + TopicPartition tp1 = new TopicPartition("A", 0); + TopicPartition tp2 = new TopicPartition("B", 0); + Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments = new HashMap<>(); + reassignments.put(tp1, Optional.empty()); + reassignments.put(tp2, NewPartitionReassignment.of(1, 2, 3)); + + // 1. server returns less responses than number of partitions we sent + AlterPartitionReassignmentsResponseData responseData1 = new AlterPartitionReassignmentsResponseData(); + ReassignablePartitionResponse normalPartitionResponse = new ReassignablePartitionResponse().setPartitionIndex(0); + responseData1.setResponses(Collections.singletonList( + new ReassignableTopicResponse() + .setName("A") + .setPartitions(Collections.singletonList(normalPartitionResponse)))); + env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(responseData1)); + AlterPartitionReassignmentsResult result1 = env.adminClient().alterPartitionReassignments(reassignments); + Future future1 = result1.all(); + Future future2 = result1.values().get(tp1); + TestUtils.assertFutureError(future1, UnknownServerException.class); + TestUtils.assertFutureError(future2, UnknownServerException.class); + + // 2. NOT_CONTROLLER error handling + AlterPartitionReassignmentsResponseData controllerErrResponseData = + new AlterPartitionReassignmentsResponseData() + .setErrorCode(Errors.NOT_CONTROLLER.code()) + .setErrorMessage(Errors.NOT_CONTROLLER.message()) + .setResponses(Arrays.asList( + new ReassignableTopicResponse() + .setName("A") + .setPartitions(Collections.singletonList(normalPartitionResponse)), + new ReassignableTopicResponse() + .setName("B") + .setPartitions(Collections.singletonList(normalPartitionResponse))) + ); + MetadataResponse controllerNodeResponse = MetadataResponse.prepareResponse(env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), 1, Collections.emptyList()); + AlterPartitionReassignmentsResponseData normalResponse = + new AlterPartitionReassignmentsResponseData() + .setResponses(Arrays.asList( + new ReassignableTopicResponse() + .setName("A") + .setPartitions(Collections.singletonList(normalPartitionResponse)), + new ReassignableTopicResponse() + .setName("B") + .setPartitions(Collections.singletonList(normalPartitionResponse))) + ); + env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(controllerErrResponseData)); + env.kafkaClient().prepareResponse(controllerNodeResponse); + env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(normalResponse)); + AlterPartitionReassignmentsResult controllerErrResult = env.adminClient().alterPartitionReassignments(reassignments); + controllerErrResult.all().get(); + controllerErrResult.values().get(tp1).get(); + controllerErrResult.values().get(tp2).get(); + + // 3. partition-level error + AlterPartitionReassignmentsResponseData partitionLevelErrData = + new AlterPartitionReassignmentsResponseData() + .setResponses(Arrays.asList( + new ReassignableTopicResponse() + .setName("A") + .setPartitions(Collections.singletonList(new ReassignablePartitionResponse() + .setPartitionIndex(0).setErrorMessage(Errors.INVALID_REPLICA_ASSIGNMENT.message()) + .setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()) + )), + new ReassignableTopicResponse() + .setName("B") + .setPartitions(Collections.singletonList(normalPartitionResponse))) + ); + env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(partitionLevelErrData)); + AlterPartitionReassignmentsResult partitionLevelErrResult = env.adminClient().alterPartitionReassignments(reassignments); + TestUtils.assertFutureError(partitionLevelErrResult.values().get(tp1), Errors.INVALID_REPLICA_ASSIGNMENT.exception().getClass()); + partitionLevelErrResult.values().get(tp2).get(); + + // 4. top-level error + AlterPartitionReassignmentsResponseData topLevelErrResponseData = + new AlterPartitionReassignmentsResponseData() + .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()) + .setErrorMessage(Errors.CLUSTER_AUTHORIZATION_FAILED.message()) + .setResponses(Arrays.asList( + new ReassignableTopicResponse() + .setName("A") + .setPartitions(Collections.singletonList(normalPartitionResponse)), + new ReassignableTopicResponse() + .setName("B") + .setPartitions(Collections.singletonList(normalPartitionResponse))) + ); + env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(topLevelErrResponseData)); + AlterPartitionReassignmentsResult topLevelErrResult = env.adminClient().alterPartitionReassignments(reassignments); + TestUtils.assertFutureError(topLevelErrResult.all(), Errors.CLUSTER_AUTHORIZATION_FAILED.exception().getClass()); + TestUtils.assertFutureError(topLevelErrResult.values().get(tp1), Errors.CLUSTER_AUTHORIZATION_FAILED.exception().getClass()); + TestUtils.assertFutureError(topLevelErrResult.values().get(tp2), Errors.CLUSTER_AUTHORIZATION_FAILED.exception().getClass()); + + // 5. unrepresentable topic name error + TopicPartition invalidTopicTP = new TopicPartition("", 0); + TopicPartition invalidPartitionTP = new TopicPartition("ABC", -1); + Map<TopicPartition, Optional<NewPartitionReassignment>> invalidTopicReassignments = new HashMap<>(); + invalidTopicReassignments.put(invalidPartitionTP, NewPartitionReassignment.of(1, 2, 3)); + invalidTopicReassignments.put(invalidTopicTP, NewPartitionReassignment.of(1, 2, 3)); + invalidTopicReassignments.put(tp1, NewPartitionReassignment.of(1, 2, 3)); + + AlterPartitionReassignmentsResponseData singlePartResponseData = + new AlterPartitionReassignmentsResponseData() + .setResponses(Collections.singletonList( + new ReassignableTopicResponse() + .setName("A") + .setPartitions(Collections.singletonList(normalPartitionResponse))) + ); + env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(singlePartResponseData)); + AlterPartitionReassignmentsResult unrepresentableTopicResult = env.adminClient().alterPartitionReassignments(invalidTopicReassignments); + TestUtils.assertFutureError(unrepresentableTopicResult.values().get(invalidTopicTP), InvalidTopicException.class); + TestUtils.assertFutureError(unrepresentableTopicResult.values().get(invalidPartitionTP), InvalidTopicException.class); + unrepresentableTopicResult.values().get(tp1).get(); + + // Test success scenario + AlterPartitionReassignmentsResponseData noErrResponseData = + new AlterPartitionReassignmentsResponseData() + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(Errors.NONE.message()) + .setResponses(Arrays.asList( + new ReassignableTopicResponse() + .setName("A") + .setPartitions(Collections.singletonList(normalPartitionResponse)), + new ReassignableTopicResponse() + .setName("B") + .setPartitions(Collections.singletonList(normalPartitionResponse))) + ); + env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(noErrResponseData)); + AlterPartitionReassignmentsResult noErrResult = env.adminClient().alterPartitionReassignments(reassignments); + noErrResult.all().get(); + noErrResult.values().get(tp1).get(); + noErrResult.values().get(tp2).get(); + } + } + + @Test + public void testListPartitionReassignments() throws Exception { + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + TopicPartition tp1 = new TopicPartition("A", 0); + OngoingPartitionReassignment tp1PartitionReassignment = new OngoingPartitionReassignment() + .setPartitionIndex(0) + .setRemovingReplicas(Arrays.asList(1, 2, 3)) + .setAddingReplicas(Arrays.asList(4, 5, 6)) + .setReplicas(Arrays.asList(1, 2, 3, 4, 5, 6)); + OngoingTopicReassignment tp1Reassignment = new OngoingTopicReassignment().setName("A") + .setPartitions(Collections.singletonList(tp1PartitionReassignment)); + + TopicPartition tp2 = new TopicPartition("B", 0); + OngoingPartitionReassignment tp2PartitionReassignment = new OngoingPartitionReassignment() + .setPartitionIndex(0) + .setRemovingReplicas(Arrays.asList(1, 2, 3)) + .setAddingReplicas(Arrays.asList(4, 5, 6)) + .setReplicas(Arrays.asList(1, 2, 3, 4, 5, 6)); + OngoingTopicReassignment tp2Reassignment = new OngoingTopicReassignment().setName("B") + .setPartitions(Collections.singletonList(tp2PartitionReassignment)); + + // 1. NOT_CONTROLLER error handling + ListPartitionReassignmentsResponseData notControllerData = new ListPartitionReassignmentsResponseData() + .setErrorCode(Errors.NOT_CONTROLLER.code()) + .setErrorMessage(Errors.NOT_CONTROLLER.message()); + MetadataResponse controllerNodeResponse = MetadataResponse.prepareResponse(env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), 1, Collections.emptyList()); + ListPartitionReassignmentsResponseData reassignmentsData = new ListPartitionReassignmentsResponseData() + .setTopics(Arrays.asList(tp1Reassignment, tp2Reassignment)); + env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(notControllerData)); + env.kafkaClient().prepareResponse(controllerNodeResponse); + env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(reassignmentsData)); + + ListPartitionReassignmentsResult noControllerResult = env.adminClient().listPartitionReassignments(); + noControllerResult.reassignments().get(); // no error + + // 2. UNKNOWN_TOPIC_OR_EXCEPTION_ERROR + ListPartitionReassignmentsResponseData unknownTpData = new ListPartitionReassignmentsResponseData() + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()); + env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(unknownTpData)); + + ListPartitionReassignmentsResult unknownTpResult = env.adminClient().listPartitionReassignments(new HashSet<>(Arrays.asList(tp1, tp2))); + TestUtils.assertFutureError(unknownTpResult.reassignments(), UnknownTopicOrPartitionException.class); + + // 3. Success + ListPartitionReassignmentsResponseData responseData = new ListPartitionReassignmentsResponseData() + .setTopics(Arrays.asList(tp1Reassignment, tp2Reassignment)); + env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(responseData)); + ListPartitionReassignmentsResult responseResult = env.adminClient().listPartitionReassignments(); + + Map<TopicPartition, PartitionReassignment> reassignments = responseResult.reassignments().get(); + + PartitionReassignment tp1Result = reassignments.get(tp1); + assertEquals(tp1PartitionReassignment.addingReplicas(), tp1Result.addingReplicas()); + assertEquals(tp1PartitionReassignment.removingReplicas(), tp1Result.removingReplicas()); + assertEquals(tp1PartitionReassignment.replicas(), tp1Result.replicas()); + assertEquals(tp1PartitionReassignment.replicas(), tp1Result.replicas()); + PartitionReassignment tp2Result = reassignments.get(tp2); + assertEquals(tp2PartitionReassignment.addingReplicas(), tp2Result.addingReplicas()); + assertEquals(tp2PartitionReassignment.removingReplicas(), tp2Result.removingReplicas()); + assertEquals(tp2PartitionReassignment.replicas(), tp2Result.replicas()); + assertEquals(tp2PartitionReassignment.replicas(), tp2Result.replicas()); + } + } + private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member, MemberAssignment assignment) { return new MemberDescription(member.memberId(), diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 7ca9ce4..baaf661 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -40,6 +40,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; public class MockAdminClient extends AdminClient { @@ -421,6 +422,17 @@ public class MockAdminClient extends AdminClient { } @Override + public AlterPartitionReassignmentsResult alterPartitionReassignments(Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments, + AlterPartitionReassignmentsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> partitions, ListPartitionReassignmentsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override public void close(Duration timeout) {} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index a979d4c..8645d79 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1702,8 +1702,8 @@ public class RequestResponseTest { private ListPartitionReassignmentsResponse createListPartitionReassignmentsResponse() { ListPartitionReassignmentsResponseData data = new ListPartitionReassignmentsResponseData(); - data.topics().add( - new ListPartitionReassignmentsResponseData.OngoingTopicReassignment() + data.setTopics(Collections.singletonList( + new ListPartitionReassignmentsResponseData.OngoingTopicReassignment() .setName("topic") .setPartitions(Collections.singletonList( new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment() @@ -1713,7 +1713,7 @@ public class RequestResponseTest { .setRemovingReplicas(Collections.singletonList(1)) ) ) - ); + )); return new ListPartitionReassignmentsResponse(data); } }