This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3c4472d701a KAFKA-14867: Trigger rebalance when replica racks change 
if client.rack is configured (KIP-881) (#13474)
3c4472d701a is described below

commit 3c4472d701a7e9d9b8714a0b9d87ae190d1679fb
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Fri Mar 31 15:01:07 2023 +0100

    KAFKA-14867: Trigger rebalance when replica racks change if client.rack is 
configured (KIP-881) (#13474)
    
    When `client.rack` is configured for consumers, we perform rack-aware 
consumer partition assignment to improve locality. After/during reassignments, 
replica racks may change, so to ensure optimal consumer assignment, trigger 
rebalance from the leader when set of racks of any partition changes.
    
    Reviewers: David Jacot <dja...@confluent.io>
---
 .../consumer/internals/ConsumerCoordinator.java    |  58 +++++--
 .../internals/ConsumerCoordinatorTest.java         | 171 ++++++++++++++++++---
 .../server/FetchFromFollowerIntegrationTest.scala  |  26 +++-
 3 files changed, 224 insertions(+), 31 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index fec31fe80f8..1f6c5ef0d75 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import java.util.Arrays;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import org.apache.kafka.clients.GroupRebalanceConfig;
@@ -35,6 +36,7 @@ import 
org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparato
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -174,7 +176,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         this.rebalanceConfig = rebalanceConfig;
         this.log = logContext.logger(ConsumerCoordinator.class);
         this.metadata = metadata;
-        this.metadataSnapshot = new MetadataSnapshot(subscriptions, 
metadata.fetch(), metadata.updateVersion());
+        this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : 
Optional.of(rackId);
+        this.metadataSnapshot = new MetadataSnapshot(this.rackId, 
subscriptions, metadata.fetch(), metadata.updateVersion());
         this.subscriptions = subscriptions;
         this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback();
         this.autoCommitEnabled = autoCommitEnabled;
@@ -188,7 +191,6 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         this.groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId,
             JoinGroupRequest.UNKNOWN_GENERATION_ID, 
JoinGroupRequest.UNKNOWN_MEMBER_ID, rebalanceConfig.groupInstanceId);
         this.throwOnFetchStableOffsetsUnsupported = 
throwOnFetchStableOffsetsUnsupported;
-        this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : 
Optional.of(rackId);
 
         if (autoCommitEnabled)
             this.nextAutoCommitTimer = time.timer(autoCommitIntervalMs);
@@ -489,7 +491,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
             // Update the current snapshot, which will be used to check for 
subscription
             // changes that would require a rebalance (e.g. new partitions).
-            metadataSnapshot = new MetadataSnapshot(subscriptions, cluster, 
version);
+            metadataSnapshot = new MetadataSnapshot(rackId, subscriptions, 
cluster, version);
         }
     }
 
@@ -1613,14 +1615,18 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
     private static class MetadataSnapshot {
         private final int version;
-        private final Map<String, Integer> partitionsPerTopic;
+        private final Map<String, List<PartitionRackInfo>> partitionsPerTopic;
 
-        private MetadataSnapshot(SubscriptionState subscription, Cluster 
cluster, int version) {
-            Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        private MetadataSnapshot(Optional<String> clientRack, 
SubscriptionState subscription, Cluster cluster, int version) {
+            Map<String, List<PartitionRackInfo>> partitionsPerTopic = new 
HashMap<>();
             for (String topic : subscription.metadataTopics()) {
-                Integer numPartitions = cluster.partitionCountForTopic(topic);
-                if (numPartitions != null)
-                    partitionsPerTopic.put(topic, numPartitions);
+                List<PartitionInfo> partitions = 
cluster.partitionsForTopic(topic);
+                if (partitions != null) {
+                    List<PartitionRackInfo> partitionRacks = 
partitions.stream()
+                            .map(p -> new PartitionRackInfo(clientRack, p))
+                            .collect(Collectors.toList());
+                    partitionsPerTopic.put(topic, partitionRacks);
+                }
             }
             this.partitionsPerTopic = partitionsPerTopic;
             this.version = version;
@@ -1636,6 +1642,40 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         }
     }
 
+    private static class PartitionRackInfo {
+        private final Set<String> racks;
+
+        PartitionRackInfo(Optional<String> clientRack, PartitionInfo 
partition) {
+            if (clientRack.isPresent() && partition.replicas() != null) {
+                racks = 
Arrays.stream(partition.replicas()).map(Node::rack).collect(Collectors.toSet());
+            } else {
+                racks = Collections.emptySet();
+            }
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof PartitionRackInfo)) {
+                return false;
+            }
+            PartitionRackInfo rackInfo = (PartitionRackInfo) o;
+            return Objects.equals(racks, rackInfo.racks);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(racks);
+        }
+
+        @Override
+        public String toString() {
+            return racks.isEmpty() ? "NO_RACKS" : "racks=" + racks;
+        }
+    }
+
     private static class OffsetCommitCompletion {
         private final OffsetCommitCallback callback;
         private final Map<TopicPartition, OffsetAndMetadata> offsets;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 0f1256e1689..f3c8026dff3 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
@@ -68,6 +69,7 @@ import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.LeaveGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -108,6 +110,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptySet;
@@ -1448,12 +1451,106 @@ public abstract class ConsumerCoordinatorTest {
      */
     @Test
     public void testRebalanceWithMetadataChange() {
+        MetadataResponse metadataResponse1 = 
RequestTestUtils.metadataUpdateWith(1,
+                Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 
1)));
+        MetadataResponse metadataResponse2 = 
RequestTestUtils.metadataUpdateWith(1, singletonMap(topic1, 1));
+        verifyRebalanceWithMetadataChange(Optional.empty(), partitionAssignor, 
metadataResponse1, metadataResponse2, true);
+    }
+
+    @Test
+    public void testRackAwareConsumerRebalanceWithDifferentRacks() {
+        verifyRackAwareConsumerRebalance(
+                Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), 
Arrays.asList(2, 0)),
+                Arrays.asList(Arrays.asList(0, 2), Arrays.asList(1, 2), 
Arrays.asList(2, 0)),
+                true, true);
+    }
+
+    @Test
+    public void testNonRackAwareConsumerRebalanceWithDifferentRacks() {
+        verifyRackAwareConsumerRebalance(
+                Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), 
Arrays.asList(2, 0)),
+                Arrays.asList(Arrays.asList(0, 2), Arrays.asList(1, 2), 
Arrays.asList(2, 0)),
+                false, false);
+    }
+
+    @Test
+    public void testRackAwareConsumerRebalanceWithAdditionalRacks() {
+        verifyRackAwareConsumerRebalance(
+                Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), 
Arrays.asList(2, 0)),
+                Arrays.asList(Arrays.asList(0, 1, 2), Arrays.asList(1, 2), 
Arrays.asList(2, 0)),
+                true, true);
+    }
+
+    @Test
+    public void testRackAwareConsumerRebalanceWithLessRacks() {
+        verifyRackAwareConsumerRebalance(
+                Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), 
Arrays.asList(2, 0)),
+                Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), 
Collections.singletonList(2)),
+                true, true);
+    }
+
+    @Test
+    public void testRackAwareConsumerRebalanceWithNewPartitions() {
+        verifyRackAwareConsumerRebalance(
+                Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), 
Arrays.asList(2, 0)),
+                Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), 
Arrays.asList(2, 0), Arrays.asList(0, 1)),
+                true, true);
+    }
+
+    @Test
+    public void testRackAwareConsumerRebalanceWithNoMetadataChange() {
+        verifyRackAwareConsumerRebalance(
+                Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), 
Arrays.asList(2, 0)),
+                Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), 
Arrays.asList(2, 0)),
+                true, false);
+    }
+
+    @Test
+    public void testRackAwareConsumerRebalanceWithNoRackChange() {
+        verifyRackAwareConsumerRebalance(
+                Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), 
Arrays.asList(2, 0)),
+                Arrays.asList(Arrays.asList(3, 4), Arrays.asList(4, 5), 
Arrays.asList(5, 3)),
+                true, false);
+    }
+
+    @Test
+    public void testRackAwareConsumerRebalanceWithNewReplicasOnSameRacks() {
+        verifyRackAwareConsumerRebalance(
+                Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), 
Arrays.asList(2, 0)),
+                Arrays.asList(Arrays.asList(0, 1, 3), Arrays.asList(1, 2, 5), 
Arrays.asList(2, 0, 3)),
+                true, false);
+    }
+
+    private void verifyRackAwareConsumerRebalance(List<List<Integer>> 
partitionReplicas1,
+                                                  List<List<Integer>> 
partitionReplicas2,
+                                                  boolean rackAwareConsumer,
+                                                  boolean expectRebalance) {
+        List<String> racks = Arrays.asList("rack-a", "rack-b", "rack-c");
+        MockPartitionAssignor assignor = partitionAssignor;
+        String consumerRackId = null;
+        if (rackAwareConsumer) {
+            consumerRackId = racks.get(0);
+            assignor = new RackAwareAssignor(protocol);
+            createRackAwareCoordinator(consumerRackId, assignor);
+        }
+
+        MetadataResponse metadataResponse1 = rackAwareMetadata(6, racks, 
Collections.singletonMap(topic1, partitionReplicas1));
+        MetadataResponse metadataResponse2 = rackAwareMetadata(6, racks, 
Collections.singletonMap(topic1, partitionReplicas2));
+        verifyRebalanceWithMetadataChange(Optional.ofNullable(consumerRackId), 
assignor, metadataResponse1, metadataResponse2, expectRebalance);
+    }
+
+    private void verifyRebalanceWithMetadataChange(Optional<String> rackId,
+                                                   MockPartitionAssignor 
partitionAssignor,
+                                                   MetadataResponse 
metadataResponse1,
+                                                   MetadataResponse 
metadataResponse2,
+                                                   boolean expectRebalance) {
         final String consumerId = "leader";
         final List<String> topics = Arrays.asList(topic1, topic2);
-        final List<TopicPartition> partitions = Arrays.asList(t1p, t2p);
+        final List<TopicPartition> partitions = 
metadataResponse1.topicMetadata().stream()
+                .flatMap(t -> t.partitionMetadata().stream().map(p -> new 
TopicPartition(t.topic(), p.partition())))
+                .collect(Collectors.toList());
         subscriptions.subscribe(toSet(topics), rebalanceListener);
-        client.updateMetadata(RequestTestUtils.metadataUpdateWith(1,
-                Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 
1))));
+        client.updateMetadata(metadataResponse1);
         coordinator.maybeUpdateSubscriptionMetadata();
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
@@ -1462,7 +1559,7 @@ public abstract class ConsumerCoordinatorTest {
         Map<String, List<String>> initialSubscription = 
singletonMap(consumerId, topics);
         partitionAssignor.prepare(singletonMap(consumerId, partitions));
 
-        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, 
initialSubscription, Errors.NONE));
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, 
initialSubscription, false, Errors.NONE, rackId));
         client.prepareResponse(syncGroupResponse(partitions, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
@@ -1475,13 +1572,18 @@ public abstract class ConsumerCoordinatorTest {
         assertEquals(1, rebalanceListener.assignedCount);
 
         // Change metadata to trigger rebalance.
-        client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, 
singletonMap(topic1, 1)));
+        client.updateMetadata(metadataResponse2);
         coordinator.poll(time.timer(0));
 
+        if (!expectRebalance) {
+            assertEquals(0, client.requests().size());
+            return;
+        }
+        assertEquals(1, client.requests().size());
+
         // Revert metadata to original value. Fail pending JoinGroup. Another
         // JoinGroup should be sent, which will be completed successfully.
-        client.updateMetadata(RequestTestUtils.metadataUpdateWith(1,
-                Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 
1))));
+        client.updateMetadata(metadataResponse1);
         client.respond(joinGroupFollowerResponse(1, consumerId, "leader", 
Errors.NOT_COORDINATOR));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         assertFalse(client.hasInFlightRequests());
@@ -1495,7 +1597,7 @@ public abstract class ConsumerCoordinatorTest {
                 JoinGroupRequest joinRequest = (JoinGroupRequest) request;
                 return consumerId.equals(joinRequest.data().memberId());
             }
-        }, joinGroupLeaderResponse(2, consumerId, initialSubscription, 
Errors.NONE));
+        }, joinGroupLeaderResponse(2, consumerId, initialSubscription, false, 
Errors.NONE, rackId));
         client.prepareResponse(syncGroupResponse(partitions, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
@@ -3494,16 +3596,10 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testSubscriptionRackId() {
-        metrics.close();
-        coordinator.close(time.timer(0));
 
         String rackId = "rack-a";
-        metrics = new Metrics(time);
-        RackAwareAssignor assignor = new RackAwareAssignor();
-
-        coordinator = new ConsumerCoordinator(rebalanceConfig, new 
LogContext(), consumerClient,
-                Collections.singletonList(assignor), metadata, subscriptions,
-                metrics, consumerId + groupId, time, false, 
autoCommitIntervalMs, null, false, rackId);
+        RackAwareAssignor assignor = new RackAwareAssignor(protocol);
+        createRackAwareCoordinator(rackId, assignor);
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
         client.updateMetadata(metadataResponse);
@@ -3927,6 +4023,45 @@ public abstract class ConsumerCoordinatorTest {
         };
     }
 
+    private void createRackAwareCoordinator(String rackId, 
MockPartitionAssignor assignor) {
+        metrics.close();
+        coordinator.close(time.timer(0));
+
+        metrics = new Metrics(time);
+
+        coordinator = new ConsumerCoordinator(rebalanceConfig, new 
LogContext(), consumerClient,
+                Collections.singletonList(assignor), metadata, subscriptions,
+                metrics, consumerId + groupId, time, false, 
autoCommitIntervalMs, null, false, rackId);
+    }
+
+    private static MetadataResponse rackAwareMetadata(int numNodes,
+                                                      List<String> racks,
+                                                      Map<String, 
List<List<Integer>>> partitionReplicas) {
+        final List<Node> nodes = new ArrayList<>(numNodes);
+        for (int i = 0; i < numNodes; i++)
+            nodes.add(new Node(i, "localhost", 1969 + i, racks.get(i % 
racks.size())));
+
+        List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
+        for (Map.Entry<String, List<List<Integer>>> topicPartitionCountEntry : 
partitionReplicas.entrySet()) {
+            String topic = topicPartitionCountEntry.getKey();
+            int numPartitions = topicPartitionCountEntry.getValue().size();
+
+            List<MetadataResponse.PartitionMetadata> partitionMetadata = new 
ArrayList<>(numPartitions);
+            for (int i = 0; i < numPartitions; i++) {
+                TopicPartition tp = new TopicPartition(topic, i);
+                List<Integer> replicaIds = 
topicPartitionCountEntry.getValue().get(i);
+                partitionMetadata.add(new PartitionMetadata(
+                        Errors.NONE, tp, Optional.of(replicaIds.get(0)), 
Optional.empty(),
+                        replicaIds, replicaIds, Collections.emptyList()));
+            }
+
+            topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, 
topic, Uuid.ZERO_UUID,
+                    Topic.isInternal(topic), partitionMetadata, 
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED));
+        }
+
+        return RequestTestUtils.metadataResponse(nodes, "kafka-cluster", 0, 
topicMetadata, ApiKeys.METADATA.latestVersion());
+    }
+
     private static class MockCommitCallback implements OffsetCommitCallback {
         public int invoked = 0;
         public Exception exception = null;
@@ -3941,8 +4076,8 @@ public abstract class ConsumerCoordinatorTest {
     private static class RackAwareAssignor extends MockPartitionAssignor {
         private final Set<String> rackIds = new HashSet<>();
 
-        RackAwareAssignor() {
-            super(Arrays.asList(RebalanceProtocol.EAGER));
+        RackAwareAssignor(RebalanceProtocol rebalanceProtocol) {
+            super(Collections.singletonList(rebalanceProtocol));
         }
 
         @Override
diff --git 
a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
index 1f940efc422..3a2f9430aa8 100644
--- 
a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
@@ -18,6 +18,7 @@ package integration.kafka.server
 
 import kafka.server.{BaseFetchRequestTest, KafkaConfig}
 import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.NewPartitionReassignment
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, 
RangeAssignor}
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
@@ -29,6 +30,7 @@ import org.junit.jupiter.api.{Test, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util
 import java.util.{Collections, Properties}
 import java.util.concurrent.{Executors, TimeUnit}
 import scala.jdk.CollectionConverters._
@@ -187,13 +189,17 @@ class FetchFromFollowerIntegrationTest extends 
BaseFetchRequestTest {
       consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
       consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, 
server.config.rack.orNull)
       consumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
s"instance-${server.config.brokerId}")
+      consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 
"1000")
       createConsumer()
     }
 
     val producer = createProducer()
     val executor = Executors.newFixedThreadPool(consumers.size)
 
-    def verifyAssignments(assignments: List[Set[TopicPartition]]): Unit = {
+    def verifyAssignments(partitionOrder: List[Int], topics: String*): Unit = {
+      val assignments = partitionOrder.map { p =>
+        topics.map(topic => new TopicPartition(topic, p)).toSet
+      }
       val assignmentFutures = consumers.zipWithIndex.map { case (consumer, i) 
=>
         executor.submit(() => {
           val expectedAssignment = assignments(i)
@@ -212,18 +218,30 @@ class FetchFromFollowerIntegrationTest extends 
BaseFetchRequestTest {
       }
     }
 
+
     try {
       // Rack-based assignment results in partitions assigned in reverse order 
since partition racks are in the reverse order.
       
consumers.foreach(_.subscribe(Collections.singleton(topicWithSingleRackPartitions)))
-      verifyAssignments(partitionList.reverse.map(p => Set(new 
TopicPartition(topicWithSingleRackPartitions, p))))
+      verifyAssignments(partitionList.reverse, topicWithSingleRackPartitions)
 
       // Non-rack-aware assignment results in ordered partitions.
       
consumers.foreach(_.subscribe(Collections.singleton(topicWithAllPartitionsOnAllRacks)))
-      verifyAssignments(partitionList.map(p => Set(new 
TopicPartition(topicWithAllPartitionsOnAllRacks, p))))
+      verifyAssignments(partitionList, topicWithAllPartitionsOnAllRacks)
 
       // Rack-aware assignment with co-partitioning results in reverse 
assignment for both topics.
       consumers.foreach(_.subscribe(Set(topicWithSingleRackPartitions, 
topicWithAllPartitionsOnAllRacks).asJava))
-      verifyAssignments(partitionList.reverse.map(p => Set(new 
TopicPartition(topicWithAllPartitionsOnAllRacks, p), new 
TopicPartition(topicWithSingleRackPartitions, p))))
+      verifyAssignments(partitionList.reverse, 
topicWithAllPartitionsOnAllRacks, topicWithSingleRackPartitions)
+
+      // Perform reassignment for topicWithSingleRackPartitions to reverse the 
replica racks and
+      // verify that change in replica racks results in re-assignment based on 
new racks.
+      val admin = createAdminClient()
+      val reassignments = new util.HashMap[TopicPartition, 
util.Optional[NewPartitionReassignment]]()
+      partitionList.foreach { p =>
+        val newAssignment = new 
NewPartitionReassignment(Collections.singletonList(p))
+        reassignments.put(new TopicPartition(topicWithSingleRackPartitions, 
p), util.Optional.of(newAssignment))
+      }
+      admin.alterPartitionReassignments(reassignments).all().get(15, 
TimeUnit.SECONDS)
+      verifyAssignments(partitionList, topicWithAllPartitionsOnAllRacks, 
topicWithSingleRackPartitions)
 
     } finally {
       executor.shutdownNow()

Reply via email to