This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 3c4472d701a KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881) (#13474) 3c4472d701a is described below commit 3c4472d701a7e9d9b8714a0b9d87ae190d1679fb Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Fri Mar 31 15:01:07 2023 +0100 KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881) (#13474) When `client.rack` is configured for consumers, we perform rack-aware consumer partition assignment to improve locality. After/during reassignments, replica racks may change, so to ensure optimal consumer assignment, trigger rebalance from the leader when set of racks of any partition changes. Reviewers: David Jacot <dja...@confluent.io> --- .../consumer/internals/ConsumerCoordinator.java | 58 +++++-- .../internals/ConsumerCoordinatorTest.java | 171 ++++++++++++++++++--- .../server/FetchFromFollowerIntegrationTest.scala | 26 +++- 3 files changed, 224 insertions(+), 31 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index fec31fe80f8..1f6c5ef0d75 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import java.util.Arrays; import java.util.SortedSet; import java.util.TreeSet; import org.apache.kafka.clients.GroupRebalanceConfig; @@ -35,6 +36,7 @@ import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparato import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.GroupAuthorizationException; @@ -174,7 +176,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { this.rebalanceConfig = rebalanceConfig; this.log = logContext.logger(ConsumerCoordinator.class); this.metadata = metadata; - this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion()); + this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId); + this.metadataSnapshot = new MetadataSnapshot(this.rackId, subscriptions, metadata.fetch(), metadata.updateVersion()); this.subscriptions = subscriptions; this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback(); this.autoCommitEnabled = autoCommitEnabled; @@ -188,7 +191,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator { this.groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, JoinGroupRequest.UNKNOWN_GENERATION_ID, JoinGroupRequest.UNKNOWN_MEMBER_ID, rebalanceConfig.groupInstanceId); this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported; - this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId); if (autoCommitEnabled) this.nextAutoCommitTimer = time.timer(autoCommitIntervalMs); @@ -489,7 +491,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // Update the current snapshot, which will be used to check for subscription // changes that would require a rebalance (e.g. new partitions). - metadataSnapshot = new MetadataSnapshot(subscriptions, cluster, version); + metadataSnapshot = new MetadataSnapshot(rackId, subscriptions, cluster, version); } } @@ -1613,14 +1615,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private static class MetadataSnapshot { private final int version; - private final Map<String, Integer> partitionsPerTopic; + private final Map<String, List<PartitionRackInfo>> partitionsPerTopic; - private MetadataSnapshot(SubscriptionState subscription, Cluster cluster, int version) { - Map<String, Integer> partitionsPerTopic = new HashMap<>(); + private MetadataSnapshot(Optional<String> clientRack, SubscriptionState subscription, Cluster cluster, int version) { + Map<String, List<PartitionRackInfo>> partitionsPerTopic = new HashMap<>(); for (String topic : subscription.metadataTopics()) { - Integer numPartitions = cluster.partitionCountForTopic(topic); - if (numPartitions != null) - partitionsPerTopic.put(topic, numPartitions); + List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); + if (partitions != null) { + List<PartitionRackInfo> partitionRacks = partitions.stream() + .map(p -> new PartitionRackInfo(clientRack, p)) + .collect(Collectors.toList()); + partitionsPerTopic.put(topic, partitionRacks); + } } this.partitionsPerTopic = partitionsPerTopic; this.version = version; @@ -1636,6 +1642,40 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } } + private static class PartitionRackInfo { + private final Set<String> racks; + + PartitionRackInfo(Optional<String> clientRack, PartitionInfo partition) { + if (clientRack.isPresent() && partition.replicas() != null) { + racks = Arrays.stream(partition.replicas()).map(Node::rack).collect(Collectors.toSet()); + } else { + racks = Collections.emptySet(); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PartitionRackInfo)) { + return false; + } + PartitionRackInfo rackInfo = (PartitionRackInfo) o; + return Objects.equals(racks, rackInfo.racks); + } + + @Override + public int hashCode() { + return Objects.hash(racks); + } + + @Override + public String toString() { + return racks.isEmpty() ? "NO_RACKS" : "racks=" + racks; + } + } + private static class OffsetCommitCompletion { private final OffsetCommitCallback callback; private final Map<TopicPartition, OffsetAndMetadata> offsets; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 0f1256e1689..f3c8026dff3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; @@ -68,6 +69,7 @@ import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.LeaveGroupRequest; import org.apache.kafka.common.requests.LeaveGroupResponse; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.OffsetFetchResponse; @@ -108,6 +110,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.Collections.emptySet; @@ -1448,12 +1451,106 @@ public abstract class ConsumerCoordinatorTest { */ @Test public void testRebalanceWithMetadataChange() { + MetadataResponse metadataResponse1 = RequestTestUtils.metadataUpdateWith(1, + Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1))); + MetadataResponse metadataResponse2 = RequestTestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)); + verifyRebalanceWithMetadataChange(Optional.empty(), partitionAssignor, metadataResponse1, metadataResponse2, true); + } + + @Test + public void testRackAwareConsumerRebalanceWithDifferentRacks() { + verifyRackAwareConsumerRebalance( + Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), Arrays.asList(2, 0)), + Arrays.asList(Arrays.asList(0, 2), Arrays.asList(1, 2), Arrays.asList(2, 0)), + true, true); + } + + @Test + public void testNonRackAwareConsumerRebalanceWithDifferentRacks() { + verifyRackAwareConsumerRebalance( + Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), Arrays.asList(2, 0)), + Arrays.asList(Arrays.asList(0, 2), Arrays.asList(1, 2), Arrays.asList(2, 0)), + false, false); + } + + @Test + public void testRackAwareConsumerRebalanceWithAdditionalRacks() { + verifyRackAwareConsumerRebalance( + Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), Arrays.asList(2, 0)), + Arrays.asList(Arrays.asList(0, 1, 2), Arrays.asList(1, 2), Arrays.asList(2, 0)), + true, true); + } + + @Test + public void testRackAwareConsumerRebalanceWithLessRacks() { + verifyRackAwareConsumerRebalance( + Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), Arrays.asList(2, 0)), + Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), Collections.singletonList(2)), + true, true); + } + + @Test + public void testRackAwareConsumerRebalanceWithNewPartitions() { + verifyRackAwareConsumerRebalance( + Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), Arrays.asList(2, 0)), + Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), Arrays.asList(2, 0), Arrays.asList(0, 1)), + true, true); + } + + @Test + public void testRackAwareConsumerRebalanceWithNoMetadataChange() { + verifyRackAwareConsumerRebalance( + Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), Arrays.asList(2, 0)), + Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), Arrays.asList(2, 0)), + true, false); + } + + @Test + public void testRackAwareConsumerRebalanceWithNoRackChange() { + verifyRackAwareConsumerRebalance( + Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), Arrays.asList(2, 0)), + Arrays.asList(Arrays.asList(3, 4), Arrays.asList(4, 5), Arrays.asList(5, 3)), + true, false); + } + + @Test + public void testRackAwareConsumerRebalanceWithNewReplicasOnSameRacks() { + verifyRackAwareConsumerRebalance( + Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), Arrays.asList(2, 0)), + Arrays.asList(Arrays.asList(0, 1, 3), Arrays.asList(1, 2, 5), Arrays.asList(2, 0, 3)), + true, false); + } + + private void verifyRackAwareConsumerRebalance(List<List<Integer>> partitionReplicas1, + List<List<Integer>> partitionReplicas2, + boolean rackAwareConsumer, + boolean expectRebalance) { + List<String> racks = Arrays.asList("rack-a", "rack-b", "rack-c"); + MockPartitionAssignor assignor = partitionAssignor; + String consumerRackId = null; + if (rackAwareConsumer) { + consumerRackId = racks.get(0); + assignor = new RackAwareAssignor(protocol); + createRackAwareCoordinator(consumerRackId, assignor); + } + + MetadataResponse metadataResponse1 = rackAwareMetadata(6, racks, Collections.singletonMap(topic1, partitionReplicas1)); + MetadataResponse metadataResponse2 = rackAwareMetadata(6, racks, Collections.singletonMap(topic1, partitionReplicas2)); + verifyRebalanceWithMetadataChange(Optional.ofNullable(consumerRackId), assignor, metadataResponse1, metadataResponse2, expectRebalance); + } + + private void verifyRebalanceWithMetadataChange(Optional<String> rackId, + MockPartitionAssignor partitionAssignor, + MetadataResponse metadataResponse1, + MetadataResponse metadataResponse2, + boolean expectRebalance) { final String consumerId = "leader"; final List<String> topics = Arrays.asList(topic1, topic2); - final List<TopicPartition> partitions = Arrays.asList(t1p, t2p); + final List<TopicPartition> partitions = metadataResponse1.topicMetadata().stream() + .flatMap(t -> t.partitionMetadata().stream().map(p -> new TopicPartition(t.topic(), p.partition()))) + .collect(Collectors.toList()); subscriptions.subscribe(toSet(topics), rebalanceListener); - client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, - Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1)))); + client.updateMetadata(metadataResponse1); coordinator.maybeUpdateSubscriptionMetadata(); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); @@ -1462,7 +1559,7 @@ public abstract class ConsumerCoordinatorTest { Map<String, List<String>> initialSubscription = singletonMap(consumerId, topics); partitionAssignor.prepare(singletonMap(consumerId, partitions)); - client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE)); + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, false, Errors.NONE, rackId)); client.prepareResponse(syncGroupResponse(partitions, Errors.NONE)); coordinator.poll(time.timer(Long.MAX_VALUE)); @@ -1475,13 +1572,18 @@ public abstract class ConsumerCoordinatorTest { assertEquals(1, rebalanceListener.assignedCount); // Change metadata to trigger rebalance. - client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, singletonMap(topic1, 1))); + client.updateMetadata(metadataResponse2); coordinator.poll(time.timer(0)); + if (!expectRebalance) { + assertEquals(0, client.requests().size()); + return; + } + assertEquals(1, client.requests().size()); + // Revert metadata to original value. Fail pending JoinGroup. Another // JoinGroup should be sent, which will be completed successfully. - client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, - Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1)))); + client.updateMetadata(metadataResponse1); client.respond(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NOT_COORDINATOR)); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); assertFalse(client.hasInFlightRequests()); @@ -1495,7 +1597,7 @@ public abstract class ConsumerCoordinatorTest { JoinGroupRequest joinRequest = (JoinGroupRequest) request; return consumerId.equals(joinRequest.data().memberId()); } - }, joinGroupLeaderResponse(2, consumerId, initialSubscription, Errors.NONE)); + }, joinGroupLeaderResponse(2, consumerId, initialSubscription, false, Errors.NONE, rackId)); client.prepareResponse(syncGroupResponse(partitions, Errors.NONE)); coordinator.poll(time.timer(Long.MAX_VALUE)); @@ -3494,16 +3596,10 @@ public abstract class ConsumerCoordinatorTest { @Test public void testSubscriptionRackId() { - metrics.close(); - coordinator.close(time.timer(0)); String rackId = "rack-a"; - metrics = new Metrics(time); - RackAwareAssignor assignor = new RackAwareAssignor(); - - coordinator = new ConsumerCoordinator(rebalanceConfig, new LogContext(), consumerClient, - Collections.singletonList(assignor), metadata, subscriptions, - metrics, consumerId + groupId, time, false, autoCommitIntervalMs, null, false, rackId); + RackAwareAssignor assignor = new RackAwareAssignor(protocol); + createRackAwareCoordinator(rackId, assignor); subscriptions.subscribe(singleton(topic1), rebalanceListener); client.updateMetadata(metadataResponse); @@ -3927,6 +4023,45 @@ public abstract class ConsumerCoordinatorTest { }; } + private void createRackAwareCoordinator(String rackId, MockPartitionAssignor assignor) { + metrics.close(); + coordinator.close(time.timer(0)); + + metrics = new Metrics(time); + + coordinator = new ConsumerCoordinator(rebalanceConfig, new LogContext(), consumerClient, + Collections.singletonList(assignor), metadata, subscriptions, + metrics, consumerId + groupId, time, false, autoCommitIntervalMs, null, false, rackId); + } + + private static MetadataResponse rackAwareMetadata(int numNodes, + List<String> racks, + Map<String, List<List<Integer>>> partitionReplicas) { + final List<Node> nodes = new ArrayList<>(numNodes); + for (int i = 0; i < numNodes; i++) + nodes.add(new Node(i, "localhost", 1969 + i, racks.get(i % racks.size()))); + + List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>(); + for (Map.Entry<String, List<List<Integer>>> topicPartitionCountEntry : partitionReplicas.entrySet()) { + String topic = topicPartitionCountEntry.getKey(); + int numPartitions = topicPartitionCountEntry.getValue().size(); + + List<MetadataResponse.PartitionMetadata> partitionMetadata = new ArrayList<>(numPartitions); + for (int i = 0; i < numPartitions; i++) { + TopicPartition tp = new TopicPartition(topic, i); + List<Integer> replicaIds = topicPartitionCountEntry.getValue().get(i); + partitionMetadata.add(new PartitionMetadata( + Errors.NONE, tp, Optional.of(replicaIds.get(0)), Optional.empty(), + replicaIds, replicaIds, Collections.emptyList())); + } + + topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic, Uuid.ZERO_UUID, + Topic.isInternal(topic), partitionMetadata, MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)); + } + + return RequestTestUtils.metadataResponse(nodes, "kafka-cluster", 0, topicMetadata, ApiKeys.METADATA.latestVersion()); + } + private static class MockCommitCallback implements OffsetCommitCallback { public int invoked = 0; public Exception exception = null; @@ -3941,8 +4076,8 @@ public abstract class ConsumerCoordinatorTest { private static class RackAwareAssignor extends MockPartitionAssignor { private final Set<String> rackIds = new HashSet<>(); - RackAwareAssignor() { - super(Arrays.asList(RebalanceProtocol.EAGER)); + RackAwareAssignor(RebalanceProtocol rebalanceProtocol) { + super(Collections.singletonList(rebalanceProtocol)); } @Override diff --git a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala index 1f940efc422..3a2f9430aa8 100644 --- a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala @@ -18,6 +18,7 @@ package integration.kafka.server import kafka.server.{BaseFetchRequestTest, KafkaConfig} import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.admin.NewPartitionReassignment import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, RangeAssignor} import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition @@ -29,6 +30,7 @@ import org.junit.jupiter.api.{Test, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util import java.util.{Collections, Properties} import java.util.concurrent.{Executors, TimeUnit} import scala.jdk.CollectionConverters._ @@ -187,13 +189,17 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, server.config.rack.orNull) consumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, s"instance-${server.config.brokerId}") + consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000") createConsumer() } val producer = createProducer() val executor = Executors.newFixedThreadPool(consumers.size) - def verifyAssignments(assignments: List[Set[TopicPartition]]): Unit = { + def verifyAssignments(partitionOrder: List[Int], topics: String*): Unit = { + val assignments = partitionOrder.map { p => + topics.map(topic => new TopicPartition(topic, p)).toSet + } val assignmentFutures = consumers.zipWithIndex.map { case (consumer, i) => executor.submit(() => { val expectedAssignment = assignments(i) @@ -212,18 +218,30 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { } } + try { // Rack-based assignment results in partitions assigned in reverse order since partition racks are in the reverse order. consumers.foreach(_.subscribe(Collections.singleton(topicWithSingleRackPartitions))) - verifyAssignments(partitionList.reverse.map(p => Set(new TopicPartition(topicWithSingleRackPartitions, p)))) + verifyAssignments(partitionList.reverse, topicWithSingleRackPartitions) // Non-rack-aware assignment results in ordered partitions. consumers.foreach(_.subscribe(Collections.singleton(topicWithAllPartitionsOnAllRacks))) - verifyAssignments(partitionList.map(p => Set(new TopicPartition(topicWithAllPartitionsOnAllRacks, p)))) + verifyAssignments(partitionList, topicWithAllPartitionsOnAllRacks) // Rack-aware assignment with co-partitioning results in reverse assignment for both topics. consumers.foreach(_.subscribe(Set(topicWithSingleRackPartitions, topicWithAllPartitionsOnAllRacks).asJava)) - verifyAssignments(partitionList.reverse.map(p => Set(new TopicPartition(topicWithAllPartitionsOnAllRacks, p), new TopicPartition(topicWithSingleRackPartitions, p)))) + verifyAssignments(partitionList.reverse, topicWithAllPartitionsOnAllRacks, topicWithSingleRackPartitions) + + // Perform reassignment for topicWithSingleRackPartitions to reverse the replica racks and + // verify that change in replica racks results in re-assignment based on new racks. + val admin = createAdminClient() + val reassignments = new util.HashMap[TopicPartition, util.Optional[NewPartitionReassignment]]() + partitionList.foreach { p => + val newAssignment = new NewPartitionReassignment(Collections.singletonList(p)) + reassignments.put(new TopicPartition(topicWithSingleRackPartitions, p), util.Optional.of(newAssignment)) + } + admin.alterPartitionReassignments(reassignments).all().get(15, TimeUnit.SECONDS) + verifyAssignments(partitionList, topicWithAllPartitionsOnAllRacks, topicWithSingleRackPartitions) } finally { executor.shutdownNow()