junrao commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579411851
########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { + static class TopicControlInfo { + private final Uuid id; + private final TimelineHashMap<Integer, PartitionControlInfo> parts; + + TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { + this.id = id; + this.parts = new TimelineHashMap<>(snapshotRegistry, 0); + } + } + + static class PartitionControlInfo { + private final int[] replicas; + private final int[] isr; + private final int[] removingReplicas; + private final int[] addingReplicas; + private final int leader; + private final int leaderEpoch; + private final int partitionEpoch; + + PartitionControlInfo(PartitionRecord record) { + this(Replicas.toArray(record.replicas()), + Replicas.toArray(record.isr()), + Replicas.toArray(record.removingReplicas()), + Replicas.toArray(record.addingReplicas()), + record.leader(), + record.leaderEpoch(), + record.partitionEpoch()); + } + + PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas, + int[] addingReplicas, int leader, int leaderEpoch, + int partitionEpoch) { + this.replicas = replicas; + this.isr = isr; + this.removingReplicas = removingReplicas; + this.addingReplicas = addingReplicas; + this.leader = leader; + this.leaderEpoch = leaderEpoch; + this.partitionEpoch = partitionEpoch; + } + + PartitionControlInfo merge(PartitionChangeRecord record) { + int[] newIsr = (record.isr() == null) ? isr : Replicas.toArray(record.isr()); + int newLeader; + int newLeaderEpoch; + if (record.leader() == Integer.MIN_VALUE) { + newLeader = leader; + newLeaderEpoch = leaderEpoch; + } else { + newLeader = record.leader(); + newLeaderEpoch = leaderEpoch + 1; Review comment: Currently, for leader initiated AlterIsr request, the controller doesn't bump up the leader epoch. If we change that, it will slightly increase unavailability since all clients have to refresh the metadata in this case. ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { + static class TopicControlInfo { + private final Uuid id; + private final TimelineHashMap<Integer, PartitionControlInfo> parts; + + TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { + this.id = id; + this.parts = new TimelineHashMap<>(snapshotRegistry, 0); + } + } + + static class PartitionControlInfo { + private final int[] replicas; + private final int[] isr; + private final int[] removingReplicas; + private final int[] addingReplicas; + private final int leader; + private final int leaderEpoch; + private final int partitionEpoch; + + PartitionControlInfo(PartitionRecord record) { + this(Replicas.toArray(record.replicas()), + Replicas.toArray(record.isr()), + Replicas.toArray(record.removingReplicas()), + Replicas.toArray(record.addingReplicas()), + record.leader(), + record.leaderEpoch(), + record.partitionEpoch()); + } + + PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas, + int[] addingReplicas, int leader, int leaderEpoch, + int partitionEpoch) { + this.replicas = replicas; + this.isr = isr; + this.removingReplicas = removingReplicas; + this.addingReplicas = addingReplicas; + this.leader = leader; + this.leaderEpoch = leaderEpoch; + this.partitionEpoch = partitionEpoch; + } + + PartitionControlInfo merge(PartitionChangeRecord record) { + int[] newIsr = (record.isr() == null) ? isr : Replicas.toArray(record.isr()); + int newLeader; + int newLeaderEpoch; + if (record.leader() == Integer.MIN_VALUE) { + newLeader = leader; + newLeaderEpoch = leaderEpoch; + } else { + newLeader = record.leader(); + newLeaderEpoch = leaderEpoch + 1; + } + return new PartitionControlInfo(replicas, + newIsr, + removingReplicas, + addingReplicas, + newLeader, + newLeaderEpoch, + partitionEpoch + 1); + } + + String diff(PartitionControlInfo prev) { + StringBuilder builder = new StringBuilder(); + String prefix = ""; + if (!Arrays.equals(replicas, prev.replicas)) { + builder.append(prefix).append("oldReplicas=").append(Arrays.toString(prev.replicas)); + prefix = ", "; + builder.append(prefix).append("newReplicas=").append(Arrays.toString(replicas)); + } + if (!Arrays.equals(isr, prev.isr)) { + builder.append(prefix).append("oldIsr=").append(Arrays.toString(prev.isr)); + prefix = ", "; + builder.append(prefix).append("newIsr=").append(Arrays.toString(isr)); + } + if (!Arrays.equals(removingReplicas, prev.removingReplicas)) { + builder.append(prefix).append("oldRemovingReplicas="). + append(Arrays.toString(prev.removingReplicas)); + prefix = ", "; + builder.append(prefix).append("newRemovingReplicas="). + append(Arrays.toString(removingReplicas)); + } + if (!Arrays.equals(addingReplicas, prev.addingReplicas)) { + builder.append(prefix).append("oldAddingReplicas="). + append(Arrays.toString(prev.addingReplicas)); + prefix = ", "; + builder.append(prefix).append("newAddingReplicas="). + append(Arrays.toString(addingReplicas)); + } + if (leader != prev.leader) { + builder.append(prefix).append("oldLeader=").append(prev.leader); + prefix = ", "; + builder.append(prefix).append("newLeader=").append(leader); + } + if (leaderEpoch != prev.leaderEpoch) { + builder.append(prefix).append("oldLeaderEpoch=").append(prev.leaderEpoch); + prefix = ", "; + builder.append(prefix).append("newLeaderEpoch=").append(leaderEpoch); + } + if (partitionEpoch != prev.partitionEpoch) { + builder.append(prefix).append("oldPartitionEpoch=").append(prev.partitionEpoch); + prefix = ", "; + builder.append(prefix).append("newPartitionEpoch=").append(partitionEpoch); + } + return builder.toString(); + } + + int preferredReplica() { + if (replicas.length == 0) return -1; + return replicas[0]; + } + + @Override + public int hashCode() { + return Objects.hash(replicas, isr, removingReplicas, addingReplicas, leader, + leaderEpoch, partitionEpoch); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof PartitionControlInfo)) return false; + PartitionControlInfo other = (PartitionControlInfo) o; + return diff(other).isEmpty(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("PartitionControlInfo("); + builder.append("replicas=").append(Arrays.toString(replicas)); + builder.append(", isr=").append(Arrays.toString(isr)); + builder.append(", removingReplicas=").append(Arrays.toString(removingReplicas)); + builder.append(", addingReplicas=").append(Arrays.toString(addingReplicas)); + builder.append(", leader=").append(leader); + builder.append(", leaderEpoch=").append(leaderEpoch); + builder.append(", partitionEpoch=").append(partitionEpoch); + builder.append(")"); + return builder.toString(); + } + } + + private final SnapshotRegistry snapshotRegistry; + private final Logger log; + + /** + * The random number generator used by this object. + */ + private final Random random; + + /** + * The KIP-464 default replication factor that is used if a CreateTopics request does + * not specify one. + */ + private final short defaultReplicationFactor; + + /** + * The KIP-464 default number of partitions that is used if a CreateTopics request does + * not specify a number of partitions. + */ + private final int defaultNumPartitions; + + /** + * A reference to the controller's configuration control manager. + */ + private final ConfigurationControlManager configurationControl; + + /** + * A reference to the controller's cluster control manager. + */ + final ClusterControlManager clusterControl; + + /** + * Maps topic names to topic UUIDs. + */ + private final TimelineHashMap<String, Uuid> topicsByName; + + /** + * Maps topic UUIDs to structures containing topic information, including partitions. + */ + private final TimelineHashMap<Uuid, TopicControlInfo> topics; + + /** + * A map of broker IDs to the partitions that the broker is in the ISR for. + */ + private final BrokersToIsrs brokersToIsrs; + + ReplicationControlManager(SnapshotRegistry snapshotRegistry, + LogContext logContext, + Random random, + short defaultReplicationFactor, + int defaultNumPartitions, + ConfigurationControlManager configurationControl, + ClusterControlManager clusterControl) { + this.snapshotRegistry = snapshotRegistry; + this.log = logContext.logger(ReplicationControlManager.class); + this.random = random; + this.defaultReplicationFactor = defaultReplicationFactor; + this.defaultNumPartitions = defaultNumPartitions; + this.configurationControl = configurationControl; + this.clusterControl = clusterControl; + this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0); + this.topics = new TimelineHashMap<>(snapshotRegistry, 0); + this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry); + } + + public void replay(TopicRecord record) { + topicsByName.put(record.name(), record.topicId()); + topics.put(record.topicId(), new TopicControlInfo(snapshotRegistry, record.topicId())); + log.info("Created topic {} with ID {}.", record.name(), record.topicId()); + } + + public void replay(PartitionRecord record) { + TopicControlInfo topicInfo = topics.get(record.topicId()); + if (topicInfo == null) { + throw new RuntimeException("Tried to create partition " + record.topicId() + + ":" + record.partitionId() + ", but no topic with that ID was found."); + } + PartitionControlInfo newPartInfo = new PartitionControlInfo(record); + PartitionControlInfo prevPartInfo = topicInfo.parts.get(record.partitionId()); + if (prevPartInfo == null) { + log.info("Created partition {}:{} with {}.", record.topicId(), + record.partitionId(), newPartInfo.toString()); + topicInfo.parts.put(record.partitionId(), newPartInfo); + brokersToIsrs.update(record.topicId(), record.partitionId(), null, + newPartInfo.isr, -1, newPartInfo.leader); + } else { + String diff = newPartInfo.diff(prevPartInfo); + if (!diff.isEmpty()) { + log.info("Modified partition {}:{}: {}.", record.topicId(), + record.partitionId(), diff); + topicInfo.parts.put(record.partitionId(), newPartInfo); + brokersToIsrs.update(record.topicId(), record.partitionId(), + prevPartInfo.isr, newPartInfo.isr, prevPartInfo.leader, + newPartInfo.leader); + } + } + } + + public void replay(PartitionChangeRecord record) { + TopicControlInfo topicInfo = topics.get(record.topicId()); + if (topicInfo == null) { + throw new RuntimeException("Tried to create partition " + record.topicId() + + ":" + record.partitionId() + ", but no topic with that ID was found."); + } + PartitionControlInfo prevPartitionInfo = topicInfo.parts.get(record.partitionId()); + if (prevPartitionInfo == null) { + throw new RuntimeException("Tried to create partition " + record.topicId() + + ":" + record.partitionId() + ", but no partition with that id was found."); + } + PartitionControlInfo newPartitionInfo = prevPartitionInfo.merge(record); + topicInfo.parts.put(record.partitionId(), newPartitionInfo); + brokersToIsrs.update(record.topicId(), record.partitionId(), + prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.leader, + newPartitionInfo.leader); + log.debug("Applied ISR change record: {}", record.toString()); + } + + ControllerResult<CreateTopicsResponseData> + createTopics(CreateTopicsRequestData request) { + Map<String, ApiError> topicErrors = new HashMap<>(); + List<ApiMessageAndVersion> records = new ArrayList<>(); + + // Check the topic names. + validateNewTopicNames(topicErrors, request.topics()); + + // Identify topics that already exist and mark them with the appropriate error + request.topics().stream().filter(creatableTopic -> topicsByName.containsKey(creatableTopic.name())) + .forEach(t -> topicErrors.put(t.name(), new ApiError(Errors.TOPIC_ALREADY_EXISTS))); + + // Verify that the configurations for the new topics are OK, and figure out what + // ConfigRecords should be created. + Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges = + computeConfigChanges(topicErrors, request.topics()); + ControllerResult<Map<ConfigResource, ApiError>> configResult = + configurationControl.incrementalAlterConfigs(configChanges); + for (Entry<ConfigResource, ApiError> entry : configResult.response().entrySet()) { + if (entry.getValue().isFailure()) { + topicErrors.put(entry.getKey().name(), entry.getValue()); + } + } + records.addAll(configResult.records()); + + // Try to create whatever topics are needed. + Map<String, CreatableTopicResult> successes = new HashMap<>(); + for (CreatableTopic topic : request.topics()) { + if (topicErrors.containsKey(topic.name())) continue; + ApiError error = createTopic(topic, records, successes); + if (error.isFailure()) { + topicErrors.put(topic.name(), error); + } + } + + // Create responses for all topics. + CreateTopicsResponseData data = new CreateTopicsResponseData(); + StringBuilder resultsBuilder = new StringBuilder(); + String resultsPrefix = ""; + for (CreatableTopic topic : request.topics()) { + ApiError error = topicErrors.get(topic.name()); + if (error != null) { + data.topics().add(new CreatableTopicResult(). + setName(topic.name()). + setErrorCode(error.error().code()). + setErrorMessage(error.message())); + resultsBuilder.append(resultsPrefix).append(topic).append(": "). + append(error.error()).append(" (").append(error.message()).append(")"); + resultsPrefix = ", "; + continue; + } + CreatableTopicResult result = successes.get(topic.name()); + data.topics().add(result); + resultsBuilder.append(resultsPrefix).append(topic).append(": "). + append("SUCCESS"); + resultsPrefix = ", "; + } + log.info("createTopics result(s): {}", resultsBuilder.toString()); + return new ControllerResult<>(records, data); + } + + private ApiError createTopic(CreatableTopic topic, + List<ApiMessageAndVersion> records, + Map<String, CreatableTopicResult> successes) { + Map<Integer, PartitionControlInfo> newParts = new HashMap<>(); + if (!topic.assignments().isEmpty()) { + if (topic.replicationFactor() != -1) { + return new ApiError(Errors.INVALID_REQUEST, + "A manual partition assignment was specified, but replication " + + "factor was not set to -1."); + } + if (topic.numPartitions() != -1) { + return new ApiError(Errors.INVALID_REQUEST, + "A manual partition assignment was specified, but numPartitions " + + "was not set to -1."); + } + for (CreatableReplicaAssignment assignment : topic.assignments()) { + if (newParts.containsKey(assignment.partitionIndex())) { + return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, + "Found multiple manual partition assignments for partition " + + assignment.partitionIndex()); + } + HashSet<Integer> brokerIds = new HashSet<>(); + for (int brokerId : assignment.brokerIds()) { + if (!brokerIds.add(brokerId)) { + return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, + "The manual partition assignment specifies the same node " + + "id more than once."); + } else if (!clusterControl.unfenced(brokerId)) { + return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, + "The manual partition assignment contains node " + brokerId + + ", but that node is not usable."); + } + } + int[] replicas = new int[assignment.brokerIds().size()]; + for (int i = 0; i < replicas.length; i++) { + replicas[i] = assignment.brokerIds().get(i); + } + int[] isr = new int[assignment.brokerIds().size()]; + for (int i = 0; i < replicas.length; i++) { + isr[i] = assignment.brokerIds().get(i); + } + newParts.put(assignment.partitionIndex(), + new PartitionControlInfo(replicas, isr, null, null, isr[0], 0, 0)); + } + } else if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) { + return new ApiError(Errors.INVALID_REPLICATION_FACTOR, + "Replication factor was set to an invalid non-positive value."); + } else if (!topic.assignments().isEmpty()) { + return new ApiError(Errors.INVALID_REQUEST, + "Replication factor was not set to -1 but a manual partition " + + "assignment was specified."); + } else if (topic.numPartitions() < -1 || topic.numPartitions() == 0) { + return new ApiError(Errors.INVALID_PARTITIONS, + "Number of partitions was set to an invalid non-positive value."); + } else { + int numPartitions = topic.numPartitions() == -1 ? + defaultNumPartitions : topic.numPartitions(); + short replicationFactor = topic.replicationFactor() == -1 ? + defaultReplicationFactor : topic.replicationFactor(); + try { + List<List<Integer>> replicas = clusterControl. + placeReplicas(numPartitions, replicationFactor); + for (int partitionId = 0; partitionId < replicas.size(); partitionId++) { + int[] r = Replicas.toArray(replicas.get(partitionId)); + newParts.put(partitionId, + new PartitionControlInfo(r, r, null, null, r[0], 0, 0)); + } + } catch (InvalidReplicationFactorException e) { + return new ApiError(Errors.INVALID_REPLICATION_FACTOR, + "Unable to replicate the partition " + replicationFactor + + " times: " + e.getMessage()); + } + } + Uuid topicId = new Uuid(random.nextLong(), random.nextLong()); + successes.put(topic.name(), new CreatableTopicResult(). + setName(topic.name()). + setTopicId(topicId). + setErrorCode((short) 0). + setErrorMessage(null). + setNumPartitions(newParts.size()). + setReplicationFactor((short) newParts.get(0).replicas.length)); + records.add(new ApiMessageAndVersion(new TopicRecord(). + setName(topic.name()). + setTopicId(topicId), (short) 0)); + for (Entry<Integer, PartitionControlInfo> partEntry : newParts.entrySet()) { + int partitionIndex = partEntry.getKey(); + PartitionControlInfo info = partEntry.getValue(); + records.add(new ApiMessageAndVersion(new PartitionRecord(). + setPartitionId(partitionIndex). + setTopicId(topicId). + setReplicas(Replicas.toList(info.replicas)). + setIsr(Replicas.toList(info.isr)). + setRemovingReplicas(null). + setAddingReplicas(null). + setLeader(info.leader). + setLeaderEpoch(info.leaderEpoch). + setPartitionEpoch(0), (short) 0)); + } + return ApiError.NONE; + } + + static void validateNewTopicNames(Map<String, ApiError> topicErrors, + CreatableTopicCollection topics) { + for (CreatableTopic topic : topics) { + if (topicErrors.containsKey(topic.name())) continue; + try { + Topic.validate(topic.name()); + } catch (InvalidTopicException e) { + topicErrors.put(topic.name(), + new ApiError(Errors.INVALID_TOPIC_EXCEPTION, e.getMessage())); + } + } + } + + static Map<ConfigResource, Map<String, Entry<OpType, String>>> + computeConfigChanges(Map<String, ApiError> topicErrors, + CreatableTopicCollection topics) { + Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges = new HashMap<>(); + for (CreatableTopic topic : topics) { + if (topicErrors.containsKey(topic.name())) continue; + Map<String, Entry<OpType, String>> topicConfigs = new HashMap<>(); + for (CreateTopicsRequestData.CreateableTopicConfig config : topic.configs()) { + topicConfigs.put(config.name(), new SimpleImmutableEntry<>(SET, config.value())); + } + if (!topicConfigs.isEmpty()) { + configChanges.put(new ConfigResource(TOPIC, topic.name()), topicConfigs); + } + } + return configChanges; + } + + // VisibleForTesting + PartitionControlInfo getPartition(Uuid topicId, int partitionId) { + TopicControlInfo topic = topics.get(topicId); + if (topic == null) { + return null; + } + return topic.parts.get(partitionId); + } + + // VisibleForTesting + BrokersToIsrs brokersToIsrs() { + return brokersToIsrs; + } + + ControllerResult<AlterIsrResponseData> alterIsr(AlterIsrRequestData request) { + clusterControl.checkBrokerEpoch(request.brokerId(), request.brokerEpoch()); + AlterIsrResponseData response = new AlterIsrResponseData(); + List<ApiMessageAndVersion> records = new ArrayList<>(); + for (AlterIsrRequestData.TopicData topicData : request.topics()) { + AlterIsrResponseData.TopicData responseTopicData = + new AlterIsrResponseData.TopicData().setName(topicData.name()); + response.topics().add(responseTopicData); + Uuid topicId = topicsByName.get(topicData.name()); + if (topicId == null || !topics.containsKey(topicId)) { + for (AlterIsrRequestData.PartitionData partitionData : topicData.partitions()) { + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())); + } + continue; + } + TopicControlInfo topic = topics.get(topicId); + for (AlterIsrRequestData.PartitionData partitionData : topicData.partitions()) { + PartitionControlInfo partition = topic.parts.get(partitionData.partitionIndex()); + if (partition == null) { + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())); + continue; + } + if (partitionData.leaderEpoch() != partition.leaderEpoch) { + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.FENCED_LEADER_EPOCH.code())); + continue; + } + if (partitionData.currentIsrVersion() != partition.partitionEpoch) { + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.INVALID_UPDATE_VERSION.code())); + continue; + } + int[] newIsr = Replicas.toArray(partitionData.newIsr()); + if (!Replicas.validateIsr(partition.replicas, newIsr)) { + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.INVALID_REQUEST.code())); + } + if (!Replicas.contains(newIsr, partition.leader)) { + // An alterIsr request can't remove the current leader. + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.INVALID_REQUEST.code())); + } + records.add(new ApiMessageAndVersion(new PartitionChangeRecord(). + setPartitionId(partitionData.partitionIndex()). + setTopicId(topic.id). + setIsr(partitionData.newIsr()), (short) 0)); + } + } + return new ControllerResult<>(records, response); + } + + /** + * Generate the appropriate records to handle a broker being fenced. + * + * First, we remove this broker from any non-singleton ISR. Then we generate a + * FenceBrokerRecord. + * + * @param brokerId The broker id. + * @param records The record list to append to. + */ + + void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) { + BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId); + if (brokerRegistration == null) { + throw new RuntimeException("Can't find broker registration for broker " + brokerId); + } + handleNodeDeactivated(brokerId, false, records); + records.add(new ApiMessageAndVersion(new FenceBrokerRecord(). + setId(brokerId).setEpoch(brokerRegistration.epoch()), (short) 0)); + } + + /** + * Generate the appropriate records to handle a broker being unregistered. + * + * First, we remove this broker from any non-singleton ISR. Then we generate an + * UnregisterBrokerRecord. + * + * @param brokerId The broker id. + * @param brokerEpoch The broker epoch. + * @param records The record list to append to. + */ + void handleBrokerUnregistered(int brokerId, long brokerEpoch, + List<ApiMessageAndVersion> records) { + handleNodeDeactivated(brokerId, false, records); + records.add(new ApiMessageAndVersion(new UnregisterBrokerRecord(). + setBrokerId(brokerId).setBrokerEpoch(brokerEpoch), (short) 0)); + } + + /** + * Handle a broker being deactivated. This means we remove it from any ISR that has + * more than one element. We do not remove the broker from ISRs where it is the only + * member since this would preclude clean leader election in the future. + * It is removed as the leader for all partitions it leads. + * + * @param brokerId The broker id. + * @param alwaysBumpLeaderEpoch True if we should generate a record that + * always increments the leader epoch. + * @param records The record list to append to. + */ + void handleNodeDeactivated(int brokerId, boolean alwaysBumpLeaderEpoch, Review comment: Currently, for controller initiated ISR change (controlled shutdown or hard failure), we always bump up the leader epoch. Also, the name alwaysBumpLeaderEpoch is a bit weird since the code in handleNodeDeactivated() doesn't directly bump up leader epoch. ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { + static class TopicControlInfo { + private final Uuid id; + private final TimelineHashMap<Integer, PartitionControlInfo> parts; + + TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { + this.id = id; + this.parts = new TimelineHashMap<>(snapshotRegistry, 0); + } + } + + static class PartitionControlInfo { + private final int[] replicas; + private final int[] isr; + private final int[] removingReplicas; + private final int[] addingReplicas; + private final int leader; + private final int leaderEpoch; + private final int partitionEpoch; + + PartitionControlInfo(PartitionRecord record) { + this(Replicas.toArray(record.replicas()), + Replicas.toArray(record.isr()), + Replicas.toArray(record.removingReplicas()), + Replicas.toArray(record.addingReplicas()), + record.leader(), + record.leaderEpoch(), + record.partitionEpoch()); + } + + PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas, + int[] addingReplicas, int leader, int leaderEpoch, + int partitionEpoch) { + this.replicas = replicas; + this.isr = isr; + this.removingReplicas = removingReplicas; + this.addingReplicas = addingReplicas; + this.leader = leader; + this.leaderEpoch = leaderEpoch; + this.partitionEpoch = partitionEpoch; + } + + PartitionControlInfo merge(PartitionChangeRecord record) { + int[] newIsr = (record.isr() == null) ? isr : Replicas.toArray(record.isr()); + int newLeader; + int newLeaderEpoch; + if (record.leader() == Integer.MIN_VALUE) { + newLeader = leader; + newLeaderEpoch = leaderEpoch; + } else { + newLeader = record.leader(); + newLeaderEpoch = leaderEpoch + 1; + } + return new PartitionControlInfo(replicas, + newIsr, + removingReplicas, + addingReplicas, + newLeader, + newLeaderEpoch, + partitionEpoch + 1); + } + + String diff(PartitionControlInfo prev) { + StringBuilder builder = new StringBuilder(); + String prefix = ""; + if (!Arrays.equals(replicas, prev.replicas)) { + builder.append(prefix).append("oldReplicas=").append(Arrays.toString(prev.replicas)); + prefix = ", "; + builder.append(prefix).append("newReplicas=").append(Arrays.toString(replicas)); + } + if (!Arrays.equals(isr, prev.isr)) { + builder.append(prefix).append("oldIsr=").append(Arrays.toString(prev.isr)); + prefix = ", "; + builder.append(prefix).append("newIsr=").append(Arrays.toString(isr)); + } + if (!Arrays.equals(removingReplicas, prev.removingReplicas)) { + builder.append(prefix).append("oldRemovingReplicas="). + append(Arrays.toString(prev.removingReplicas)); + prefix = ", "; + builder.append(prefix).append("newRemovingReplicas="). + append(Arrays.toString(removingReplicas)); + } + if (!Arrays.equals(addingReplicas, prev.addingReplicas)) { + builder.append(prefix).append("oldAddingReplicas="). + append(Arrays.toString(prev.addingReplicas)); + prefix = ", "; + builder.append(prefix).append("newAddingReplicas="). + append(Arrays.toString(addingReplicas)); + } + if (leader != prev.leader) { + builder.append(prefix).append("oldLeader=").append(prev.leader); + prefix = ", "; + builder.append(prefix).append("newLeader=").append(leader); + } + if (leaderEpoch != prev.leaderEpoch) { + builder.append(prefix).append("oldLeaderEpoch=").append(prev.leaderEpoch); + prefix = ", "; + builder.append(prefix).append("newLeaderEpoch=").append(leaderEpoch); + } + if (partitionEpoch != prev.partitionEpoch) { + builder.append(prefix).append("oldPartitionEpoch=").append(prev.partitionEpoch); + prefix = ", "; + builder.append(prefix).append("newPartitionEpoch=").append(partitionEpoch); + } + return builder.toString(); + } + + int preferredReplica() { + if (replicas.length == 0) return -1; + return replicas[0]; + } + + @Override + public int hashCode() { + return Objects.hash(replicas, isr, removingReplicas, addingReplicas, leader, + leaderEpoch, partitionEpoch); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof PartitionControlInfo)) return false; + PartitionControlInfo other = (PartitionControlInfo) o; + return diff(other).isEmpty(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("PartitionControlInfo("); + builder.append("replicas=").append(Arrays.toString(replicas)); + builder.append(", isr=").append(Arrays.toString(isr)); + builder.append(", removingReplicas=").append(Arrays.toString(removingReplicas)); + builder.append(", addingReplicas=").append(Arrays.toString(addingReplicas)); + builder.append(", leader=").append(leader); + builder.append(", leaderEpoch=").append(leaderEpoch); + builder.append(", partitionEpoch=").append(partitionEpoch); + builder.append(")"); + return builder.toString(); + } + } + + private final SnapshotRegistry snapshotRegistry; + private final Logger log; + + /** + * The random number generator used by this object. + */ + private final Random random; + + /** + * The KIP-464 default replication factor that is used if a CreateTopics request does + * not specify one. + */ + private final short defaultReplicationFactor; + + /** + * The KIP-464 default number of partitions that is used if a CreateTopics request does + * not specify a number of partitions. + */ + private final int defaultNumPartitions; + + /** + * A reference to the controller's configuration control manager. + */ + private final ConfigurationControlManager configurationControl; + + /** + * A reference to the controller's cluster control manager. + */ + final ClusterControlManager clusterControl; + + /** + * Maps topic names to topic UUIDs. + */ + private final TimelineHashMap<String, Uuid> topicsByName; + + /** + * Maps topic UUIDs to structures containing topic information, including partitions. + */ + private final TimelineHashMap<Uuid, TopicControlInfo> topics; + + /** + * A map of broker IDs to the partitions that the broker is in the ISR for. + */ + private final BrokersToIsrs brokersToIsrs; + + ReplicationControlManager(SnapshotRegistry snapshotRegistry, + LogContext logContext, + Random random, + short defaultReplicationFactor, + int defaultNumPartitions, + ConfigurationControlManager configurationControl, + ClusterControlManager clusterControl) { + this.snapshotRegistry = snapshotRegistry; + this.log = logContext.logger(ReplicationControlManager.class); + this.random = random; + this.defaultReplicationFactor = defaultReplicationFactor; + this.defaultNumPartitions = defaultNumPartitions; + this.configurationControl = configurationControl; + this.clusterControl = clusterControl; + this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0); + this.topics = new TimelineHashMap<>(snapshotRegistry, 0); + this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry); + } + + public void replay(TopicRecord record) { + topicsByName.put(record.name(), record.topicId()); + topics.put(record.topicId(), new TopicControlInfo(snapshotRegistry, record.topicId())); + log.info("Created topic {} with ID {}.", record.name(), record.topicId()); + } + + public void replay(PartitionRecord record) { + TopicControlInfo topicInfo = topics.get(record.topicId()); + if (topicInfo == null) { + throw new RuntimeException("Tried to create partition " + record.topicId() + + ":" + record.partitionId() + ", but no topic with that ID was found."); + } + PartitionControlInfo newPartInfo = new PartitionControlInfo(record); + PartitionControlInfo prevPartInfo = topicInfo.parts.get(record.partitionId()); + if (prevPartInfo == null) { + log.info("Created partition {}:{} with {}.", record.topicId(), + record.partitionId(), newPartInfo.toString()); + topicInfo.parts.put(record.partitionId(), newPartInfo); + brokersToIsrs.update(record.topicId(), record.partitionId(), null, + newPartInfo.isr, -1, newPartInfo.leader); + } else { + String diff = newPartInfo.diff(prevPartInfo); + if (!diff.isEmpty()) { + log.info("Modified partition {}:{}: {}.", record.topicId(), + record.partitionId(), diff); + topicInfo.parts.put(record.partitionId(), newPartInfo); + brokersToIsrs.update(record.topicId(), record.partitionId(), + prevPartInfo.isr, newPartInfo.isr, prevPartInfo.leader, + newPartInfo.leader); + } + } + } + + public void replay(PartitionChangeRecord record) { + TopicControlInfo topicInfo = topics.get(record.topicId()); + if (topicInfo == null) { + throw new RuntimeException("Tried to create partition " + record.topicId() + + ":" + record.partitionId() + ", but no topic with that ID was found."); + } + PartitionControlInfo prevPartitionInfo = topicInfo.parts.get(record.partitionId()); + if (prevPartitionInfo == null) { + throw new RuntimeException("Tried to create partition " + record.topicId() + + ":" + record.partitionId() + ", but no partition with that id was found."); + } + PartitionControlInfo newPartitionInfo = prevPartitionInfo.merge(record); + topicInfo.parts.put(record.partitionId(), newPartitionInfo); + brokersToIsrs.update(record.topicId(), record.partitionId(), + prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.leader, + newPartitionInfo.leader); + log.debug("Applied ISR change record: {}", record.toString()); + } + + ControllerResult<CreateTopicsResponseData> + createTopics(CreateTopicsRequestData request) { + Map<String, ApiError> topicErrors = new HashMap<>(); + List<ApiMessageAndVersion> records = new ArrayList<>(); + + // Check the topic names. + validateNewTopicNames(topicErrors, request.topics()); + + // Identify topics that already exist and mark them with the appropriate error + request.topics().stream().filter(creatableTopic -> topicsByName.containsKey(creatableTopic.name())) + .forEach(t -> topicErrors.put(t.name(), new ApiError(Errors.TOPIC_ALREADY_EXISTS))); + + // Verify that the configurations for the new topics are OK, and figure out what + // ConfigRecords should be created. + Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges = + computeConfigChanges(topicErrors, request.topics()); + ControllerResult<Map<ConfigResource, ApiError>> configResult = + configurationControl.incrementalAlterConfigs(configChanges); + for (Entry<ConfigResource, ApiError> entry : configResult.response().entrySet()) { + if (entry.getValue().isFailure()) { + topicErrors.put(entry.getKey().name(), entry.getValue()); + } + } + records.addAll(configResult.records()); + + // Try to create whatever topics are needed. + Map<String, CreatableTopicResult> successes = new HashMap<>(); + for (CreatableTopic topic : request.topics()) { + if (topicErrors.containsKey(topic.name())) continue; + ApiError error = createTopic(topic, records, successes); + if (error.isFailure()) { + topicErrors.put(topic.name(), error); + } + } + + // Create responses for all topics. + CreateTopicsResponseData data = new CreateTopicsResponseData(); + StringBuilder resultsBuilder = new StringBuilder(); + String resultsPrefix = ""; + for (CreatableTopic topic : request.topics()) { + ApiError error = topicErrors.get(topic.name()); + if (error != null) { + data.topics().add(new CreatableTopicResult(). + setName(topic.name()). + setErrorCode(error.error().code()). + setErrorMessage(error.message())); + resultsBuilder.append(resultsPrefix).append(topic).append(": "). + append(error.error()).append(" (").append(error.message()).append(")"); + resultsPrefix = ", "; + continue; + } + CreatableTopicResult result = successes.get(topic.name()); + data.topics().add(result); + resultsBuilder.append(resultsPrefix).append(topic).append(": "). + append("SUCCESS"); + resultsPrefix = ", "; + } + log.info("createTopics result(s): {}", resultsBuilder.toString()); + return new ControllerResult<>(records, data); + } + + private ApiError createTopic(CreatableTopic topic, + List<ApiMessageAndVersion> records, + Map<String, CreatableTopicResult> successes) { + Map<Integer, PartitionControlInfo> newParts = new HashMap<>(); + if (!topic.assignments().isEmpty()) { + if (topic.replicationFactor() != -1) { + return new ApiError(Errors.INVALID_REQUEST, + "A manual partition assignment was specified, but replication " + + "factor was not set to -1."); + } + if (topic.numPartitions() != -1) { + return new ApiError(Errors.INVALID_REQUEST, + "A manual partition assignment was specified, but numPartitions " + + "was not set to -1."); + } + for (CreatableReplicaAssignment assignment : topic.assignments()) { + if (newParts.containsKey(assignment.partitionIndex())) { + return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, + "Found multiple manual partition assignments for partition " + + assignment.partitionIndex()); + } + HashSet<Integer> brokerIds = new HashSet<>(); + for (int brokerId : assignment.brokerIds()) { + if (!brokerIds.add(brokerId)) { + return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, + "The manual partition assignment specifies the same node " + + "id more than once."); + } else if (!clusterControl.unfenced(brokerId)) { + return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, + "The manual partition assignment contains node " + brokerId + + ", but that node is not usable."); + } + } + int[] replicas = new int[assignment.brokerIds().size()]; + for (int i = 0; i < replicas.length; i++) { + replicas[i] = assignment.brokerIds().get(i); + } + int[] isr = new int[assignment.brokerIds().size()]; + for (int i = 0; i < replicas.length; i++) { + isr[i] = assignment.brokerIds().get(i); + } + newParts.put(assignment.partitionIndex(), + new PartitionControlInfo(replicas, isr, null, null, isr[0], 0, 0)); + } + } else if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) { + return new ApiError(Errors.INVALID_REPLICATION_FACTOR, + "Replication factor was set to an invalid non-positive value."); + } else if (!topic.assignments().isEmpty()) { + return new ApiError(Errors.INVALID_REQUEST, + "Replication factor was not set to -1 but a manual partition " + + "assignment was specified."); + } else if (topic.numPartitions() < -1 || topic.numPartitions() == 0) { + return new ApiError(Errors.INVALID_PARTITIONS, + "Number of partitions was set to an invalid non-positive value."); + } else { + int numPartitions = topic.numPartitions() == -1 ? + defaultNumPartitions : topic.numPartitions(); + short replicationFactor = topic.replicationFactor() == -1 ? + defaultReplicationFactor : topic.replicationFactor(); + try { + List<List<Integer>> replicas = clusterControl. + placeReplicas(numPartitions, replicationFactor); + for (int partitionId = 0; partitionId < replicas.size(); partitionId++) { + int[] r = Replicas.toArray(replicas.get(partitionId)); + newParts.put(partitionId, + new PartitionControlInfo(r, r, null, null, r[0], 0, 0)); + } + } catch (InvalidReplicationFactorException e) { + return new ApiError(Errors.INVALID_REPLICATION_FACTOR, + "Unable to replicate the partition " + replicationFactor + + " times: " + e.getMessage()); + } + } + Uuid topicId = new Uuid(random.nextLong(), random.nextLong()); + successes.put(topic.name(), new CreatableTopicResult(). + setName(topic.name()). + setTopicId(topicId). + setErrorCode((short) 0). + setErrorMessage(null). + setNumPartitions(newParts.size()). + setReplicationFactor((short) newParts.get(0).replicas.length)); + records.add(new ApiMessageAndVersion(new TopicRecord(). + setName(topic.name()). + setTopicId(topicId), (short) 0)); + for (Entry<Integer, PartitionControlInfo> partEntry : newParts.entrySet()) { + int partitionIndex = partEntry.getKey(); + PartitionControlInfo info = partEntry.getValue(); + records.add(new ApiMessageAndVersion(new PartitionRecord(). + setPartitionId(partitionIndex). + setTopicId(topicId). + setReplicas(Replicas.toList(info.replicas)). + setIsr(Replicas.toList(info.isr)). + setRemovingReplicas(null). + setAddingReplicas(null). + setLeader(info.leader). + setLeaderEpoch(info.leaderEpoch). + setPartitionEpoch(0), (short) 0)); + } + return ApiError.NONE; + } + + static void validateNewTopicNames(Map<String, ApiError> topicErrors, + CreatableTopicCollection topics) { + for (CreatableTopic topic : topics) { + if (topicErrors.containsKey(topic.name())) continue; + try { + Topic.validate(topic.name()); + } catch (InvalidTopicException e) { + topicErrors.put(topic.name(), + new ApiError(Errors.INVALID_TOPIC_EXCEPTION, e.getMessage())); + } + } + } + + static Map<ConfigResource, Map<String, Entry<OpType, String>>> + computeConfigChanges(Map<String, ApiError> topicErrors, + CreatableTopicCollection topics) { + Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges = new HashMap<>(); + for (CreatableTopic topic : topics) { + if (topicErrors.containsKey(topic.name())) continue; + Map<String, Entry<OpType, String>> topicConfigs = new HashMap<>(); + for (CreateTopicsRequestData.CreateableTopicConfig config : topic.configs()) { + topicConfigs.put(config.name(), new SimpleImmutableEntry<>(SET, config.value())); + } + if (!topicConfigs.isEmpty()) { + configChanges.put(new ConfigResource(TOPIC, topic.name()), topicConfigs); + } + } + return configChanges; + } + + // VisibleForTesting + PartitionControlInfo getPartition(Uuid topicId, int partitionId) { + TopicControlInfo topic = topics.get(topicId); + if (topic == null) { + return null; + } + return topic.parts.get(partitionId); + } + + // VisibleForTesting + BrokersToIsrs brokersToIsrs() { + return brokersToIsrs; + } + + ControllerResult<AlterIsrResponseData> alterIsr(AlterIsrRequestData request) { + clusterControl.checkBrokerEpoch(request.brokerId(), request.brokerEpoch()); + AlterIsrResponseData response = new AlterIsrResponseData(); + List<ApiMessageAndVersion> records = new ArrayList<>(); + for (AlterIsrRequestData.TopicData topicData : request.topics()) { + AlterIsrResponseData.TopicData responseTopicData = + new AlterIsrResponseData.TopicData().setName(topicData.name()); + response.topics().add(responseTopicData); + Uuid topicId = topicsByName.get(topicData.name()); + if (topicId == null || !topics.containsKey(topicId)) { + for (AlterIsrRequestData.PartitionData partitionData : topicData.partitions()) { + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())); + } + continue; + } + TopicControlInfo topic = topics.get(topicId); + for (AlterIsrRequestData.PartitionData partitionData : topicData.partitions()) { + PartitionControlInfo partition = topic.parts.get(partitionData.partitionIndex()); + if (partition == null) { + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())); + continue; + } + if (partitionData.leaderEpoch() != partition.leaderEpoch) { + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.FENCED_LEADER_EPOCH.code())); + continue; + } + if (partitionData.currentIsrVersion() != partition.partitionEpoch) { + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.INVALID_UPDATE_VERSION.code())); + continue; + } + int[] newIsr = Replicas.toArray(partitionData.newIsr()); + if (!Replicas.validateIsr(partition.replicas, newIsr)) { + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.INVALID_REQUEST.code())); + } + if (!Replicas.contains(newIsr, partition.leader)) { + // An alterIsr request can't remove the current leader. + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.INVALID_REQUEST.code())); + } + records.add(new ApiMessageAndVersion(new PartitionChangeRecord(). + setPartitionId(partitionData.partitionIndex()). + setTopicId(topic.id). + setIsr(partitionData.newIsr()), (short) 0)); + } + } + return new ControllerResult<>(records, response); + } + + /** + * Generate the appropriate records to handle a broker being fenced. + * + * First, we remove this broker from any non-singleton ISR. Then we generate a + * FenceBrokerRecord. + * + * @param brokerId The broker id. + * @param records The record list to append to. + */ + + void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) { + BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId); + if (brokerRegistration == null) { + throw new RuntimeException("Can't find broker registration for broker " + brokerId); + } + handleNodeDeactivated(brokerId, false, records); + records.add(new ApiMessageAndVersion(new FenceBrokerRecord(). + setId(brokerId).setEpoch(brokerRegistration.epoch()), (short) 0)); + } + + /** + * Generate the appropriate records to handle a broker being unregistered. + * + * First, we remove this broker from any non-singleton ISR. Then we generate an + * UnregisterBrokerRecord. + * + * @param brokerId The broker id. + * @param brokerEpoch The broker epoch. + * @param records The record list to append to. + */ + void handleBrokerUnregistered(int brokerId, long brokerEpoch, + List<ApiMessageAndVersion> records) { + handleNodeDeactivated(brokerId, false, records); + records.add(new ApiMessageAndVersion(new UnregisterBrokerRecord(). + setBrokerId(brokerId).setBrokerEpoch(brokerEpoch), (short) 0)); + } + + /** + * Handle a broker being deactivated. This means we remove it from any ISR that has + * more than one element. We do not remove the broker from ISRs where it is the only + * member since this would preclude clean leader election in the future. + * It is removed as the leader for all partitions it leads. + * + * @param brokerId The broker id. + * @param alwaysBumpLeaderEpoch True if we should generate a record that + * always increments the leader epoch. + * @param records The record list to append to. + */ + void handleNodeDeactivated(int brokerId, boolean alwaysBumpLeaderEpoch, + List<ApiMessageAndVersion> records) { + Iterator<TopicPartition> iterator = brokersToIsrs.iterator(brokerId, false); + while (iterator.hasNext()) { + TopicPartition topicPartition = iterator.next(); + TopicControlInfo topic = topics.get(topicPartition.topicId()); + if (topic == null) { + throw new RuntimeException("Topic ID " + topicPartition.topicId() + " existed in " + + "isrMembers, but not in the topics map."); + } + PartitionControlInfo partition = topic.parts.get(topicPartition.partitionId()); + if (partition == null) { + throw new RuntimeException("Partition " + topicPartition + + " existed in isrMembers, but not in the partitions map."); + } + PartitionChangeRecord record = new PartitionChangeRecord(). + setPartitionId(topicPartition.partitionId()). + setTopicId(topic.id); + int[] newIsr = Replicas.copyWithout(partition.isr, brokerId); + if (newIsr.length == 0) { + // We don't want to shrink the ISR to size 0. So, leave the node in the ISR. + if (alwaysBumpLeaderEpoch || record.leader() != -1) { + // The partition is now leaderless, so set its leader to -1. + record.setLeader(-1); + records.add(new ApiMessageAndVersion(record, (short) 0)); + } + } else { + record.setIsr(Replicas.toList(newIsr)); + if (partition.leader == brokerId) { + // The fenced node will no longer be the leader. + int newLeader = chooseNewLeader(partition, newIsr, false); + record.setLeader(newLeader); + } else if (alwaysBumpLeaderEpoch) { + record.setLeader(partition.leader); + } + records.add(new ApiMessageAndVersion(record, (short) 0)); + } + } + } + + /** + * Generate the appropriate records to handle a broker becoming unfenced. + * + * First, we create an UnfenceBrokerRecord. Then, we check if if there are any + * partitions that don't currently have a leader that should be led by the newly + * unfenced broker. + * + * @param brokerId The broker id. + * @param brokerEpoch The broker epoch. + * @param records The record list to append to. + */ + void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) { + records.add(new ApiMessageAndVersion(new UnfenceBrokerRecord(). + setId(brokerId).setEpoch(brokerEpoch), (short) 0)); + handleNodeActivated(brokerId, records); + } + + /** + * Handle a broker being activated. This means we check if it can become the leader + * for any partition that currently has no leader (aka offline partition). + * + * @param brokerId The broker id. + * @param records The record list to append to. + */ + void handleNodeActivated(int brokerId, List<ApiMessageAndVersion> records) { + Iterator<TopicPartition> iterator = brokersToIsrs.noLeaderIterator(); + while (iterator.hasNext()) { + TopicPartition topicPartition = iterator.next(); + TopicControlInfo topic = topics.get(topicPartition.topicId()); + if (topic == null) { + throw new RuntimeException("Topic ID " + topicPartition.topicId() + " existed in " + + "isrMembers, but not in the topics map."); + } + PartitionControlInfo partition = topic.parts.get(topicPartition.partitionId()); + if (partition == null) { + throw new RuntimeException("Partition " + topicPartition + + " existed in isrMembers, but not in the partitions map."); + } + // TODO: if this partition is configured for unclean leader election, + // check the replica set rather than the ISR. + if (Replicas.contains(partition.isr, brokerId)) { + records.add(new ApiMessageAndVersion(new PartitionChangeRecord(). + setPartitionId(topicPartition.partitionId()). + setTopicId(topic.id). + setLeader(brokerId), (short) 0)); + } + } + } + + ControllerResult<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) { + boolean unclean = electionIsUnclean(request.electionType()); + List<ApiMessageAndVersion> records = new ArrayList<>(); + ElectLeadersResponseData response = new ElectLeadersResponseData(); + for (TopicPartitions topic : request.topicPartitions()) { + ReplicaElectionResult topicResults = + new ReplicaElectionResult().setTopic(topic.topic()); + response.replicaElectionResults().add(topicResults); + for (int partitionId : topic.partitions()) { + ApiError error = electLeader(topic.topic(), partitionId, unclean, records); + topicResults.partitionResult().add(new PartitionResult(). + setPartitionId(partitionId). + setErrorCode(error.error().code()). + setErrorMessage(error.message())); + } + } + return new ControllerResult<>(records, response); + } + + static boolean electionIsUnclean(byte electionType) { + ElectionType type; + try { + type = ElectionType.valueOf(electionType); + } catch (IllegalArgumentException e) { + throw new InvalidRequestException("Unknown election type " + (int) electionType); + } + return type == ElectionType.UNCLEAN; + } + + ApiError electLeader(String topic, int partitionId, boolean unclean, + List<ApiMessageAndVersion> records) { + Uuid topicId = topicsByName.get(topic); + if (topicId == null) { + return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, + "No such topic as " + topic); + } + TopicControlInfo topicInfo = topics.get(topicId); + if (topicInfo == null) { + return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, + "No such topic id as " + topicId); + } + PartitionControlInfo partitionInfo = topicInfo.parts.get(partitionId); + if (partitionInfo == null) { + return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, + "No such partition as " + topic + "-" + partitionId); + } + int newLeader = chooseNewLeader(partitionInfo, partitionInfo.isr, unclean); + if (newLeader < 0) { + // If we can't find any leader for the partition, return an error. + return new ApiError(Errors.LEADER_NOT_AVAILABLE, + "Unable to find any leader for the partition."); + } + if (newLeader == partitionInfo.leader) { + // If the new leader we picked is the same as the current leader, there is + // nothing to do. + return ApiError.NONE; + } + if (partitionInfo.leader != -1 && newLeader != partitionInfo.preferredReplica()) { Review comment: Hmm, it seems that we should only do `newLeader != partitionInfo.preferredReplica()` if this is a preferred leader election. ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { + static class TopicControlInfo { + private final Uuid id; + private final TimelineHashMap<Integer, PartitionControlInfo> parts; + + TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { + this.id = id; + this.parts = new TimelineHashMap<>(snapshotRegistry, 0); + } + } + + static class PartitionControlInfo { + private final int[] replicas; + private final int[] isr; + private final int[] removingReplicas; + private final int[] addingReplicas; + private final int leader; + private final int leaderEpoch; + private final int partitionEpoch; + + PartitionControlInfo(PartitionRecord record) { + this(Replicas.toArray(record.replicas()), + Replicas.toArray(record.isr()), + Replicas.toArray(record.removingReplicas()), + Replicas.toArray(record.addingReplicas()), + record.leader(), + record.leaderEpoch(), + record.partitionEpoch()); + } + + PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas, + int[] addingReplicas, int leader, int leaderEpoch, + int partitionEpoch) { + this.replicas = replicas; + this.isr = isr; + this.removingReplicas = removingReplicas; + this.addingReplicas = addingReplicas; + this.leader = leader; + this.leaderEpoch = leaderEpoch; + this.partitionEpoch = partitionEpoch; + } + + PartitionControlInfo merge(PartitionChangeRecord record) { + int[] newIsr = (record.isr() == null) ? isr : Replicas.toArray(record.isr()); + int newLeader; + int newLeaderEpoch; + if (record.leader() == Integer.MIN_VALUE) { Review comment: Hmm, does Integer.MIN_VALUE have any special meaning? If so, could we use a more intuitive constant? ########## File path: core/src/main/scala/kafka/server/BrokerServer.scala ########## @@ -397,6 +397,9 @@ class BrokerServer( info("shutting down") if (config.controlledShutdownEnable) { + // Shut down the broker metadata listener, so that we don't get added to any + // more ISRs. + brokerMetadataListener.beginShutdown() Review comment: If we do this, does `brokerMetadataListener.close() `still need to call `beginShutdown()`. ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { + static class TopicControlInfo { + private final Uuid id; + private final TimelineHashMap<Integer, PartitionControlInfo> parts; + + TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { + this.id = id; + this.parts = new TimelineHashMap<>(snapshotRegistry, 0); + } + } + + static class PartitionControlInfo { + private final int[] replicas; + private final int[] isr; + private final int[] removingReplicas; + private final int[] addingReplicas; + private final int leader; + private final int leaderEpoch; + private final int partitionEpoch; + + PartitionControlInfo(PartitionRecord record) { + this(Replicas.toArray(record.replicas()), + Replicas.toArray(record.isr()), + Replicas.toArray(record.removingReplicas()), + Replicas.toArray(record.addingReplicas()), + record.leader(), + record.leaderEpoch(), + record.partitionEpoch()); + } + + PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas, + int[] addingReplicas, int leader, int leaderEpoch, + int partitionEpoch) { + this.replicas = replicas; + this.isr = isr; + this.removingReplicas = removingReplicas; + this.addingReplicas = addingReplicas; + this.leader = leader; + this.leaderEpoch = leaderEpoch; + this.partitionEpoch = partitionEpoch; + } + + PartitionControlInfo merge(PartitionChangeRecord record) { + int[] newIsr = (record.isr() == null) ? isr : Replicas.toArray(record.isr()); + int newLeader; + int newLeaderEpoch; + if (record.leader() == Integer.MIN_VALUE) { + newLeader = leader; + newLeaderEpoch = leaderEpoch; + } else { + newLeader = record.leader(); + newLeaderEpoch = leaderEpoch + 1; + } + return new PartitionControlInfo(replicas, + newIsr, + removingReplicas, + addingReplicas, + newLeader, + newLeaderEpoch, + partitionEpoch + 1); + } + + String diff(PartitionControlInfo prev) { + StringBuilder builder = new StringBuilder(); + String prefix = ""; + if (!Arrays.equals(replicas, prev.replicas)) { + builder.append(prefix).append("oldReplicas=").append(Arrays.toString(prev.replicas)); + prefix = ", "; + builder.append(prefix).append("newReplicas=").append(Arrays.toString(replicas)); + } + if (!Arrays.equals(isr, prev.isr)) { + builder.append(prefix).append("oldIsr=").append(Arrays.toString(prev.isr)); + prefix = ", "; + builder.append(prefix).append("newIsr=").append(Arrays.toString(isr)); + } + if (!Arrays.equals(removingReplicas, prev.removingReplicas)) { + builder.append(prefix).append("oldRemovingReplicas="). + append(Arrays.toString(prev.removingReplicas)); + prefix = ", "; + builder.append(prefix).append("newRemovingReplicas="). + append(Arrays.toString(removingReplicas)); + } + if (!Arrays.equals(addingReplicas, prev.addingReplicas)) { + builder.append(prefix).append("oldAddingReplicas="). + append(Arrays.toString(prev.addingReplicas)); + prefix = ", "; + builder.append(prefix).append("newAddingReplicas="). + append(Arrays.toString(addingReplicas)); + } + if (leader != prev.leader) { + builder.append(prefix).append("oldLeader=").append(prev.leader); + prefix = ", "; + builder.append(prefix).append("newLeader=").append(leader); + } + if (leaderEpoch != prev.leaderEpoch) { + builder.append(prefix).append("oldLeaderEpoch=").append(prev.leaderEpoch); + prefix = ", "; + builder.append(prefix).append("newLeaderEpoch=").append(leaderEpoch); + } + if (partitionEpoch != prev.partitionEpoch) { + builder.append(prefix).append("oldPartitionEpoch=").append(prev.partitionEpoch); + prefix = ", "; + builder.append(prefix).append("newPartitionEpoch=").append(partitionEpoch); + } + return builder.toString(); + } + + int preferredReplica() { + if (replicas.length == 0) return -1; + return replicas[0]; + } + + @Override + public int hashCode() { + return Objects.hash(replicas, isr, removingReplicas, addingReplicas, leader, + leaderEpoch, partitionEpoch); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof PartitionControlInfo)) return false; + PartitionControlInfo other = (PartitionControlInfo) o; + return diff(other).isEmpty(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("PartitionControlInfo("); + builder.append("replicas=").append(Arrays.toString(replicas)); + builder.append(", isr=").append(Arrays.toString(isr)); + builder.append(", removingReplicas=").append(Arrays.toString(removingReplicas)); + builder.append(", addingReplicas=").append(Arrays.toString(addingReplicas)); + builder.append(", leader=").append(leader); + builder.append(", leaderEpoch=").append(leaderEpoch); + builder.append(", partitionEpoch=").append(partitionEpoch); + builder.append(")"); + return builder.toString(); + } + } + + private final SnapshotRegistry snapshotRegistry; + private final Logger log; + + /** + * The random number generator used by this object. + */ + private final Random random; + + /** + * The KIP-464 default replication factor that is used if a CreateTopics request does + * not specify one. + */ + private final short defaultReplicationFactor; + + /** + * The KIP-464 default number of partitions that is used if a CreateTopics request does + * not specify a number of partitions. + */ + private final int defaultNumPartitions; + + /** + * A reference to the controller's configuration control manager. + */ + private final ConfigurationControlManager configurationControl; + + /** + * A reference to the controller's cluster control manager. + */ + final ClusterControlManager clusterControl; + + /** + * Maps topic names to topic UUIDs. + */ + private final TimelineHashMap<String, Uuid> topicsByName; + + /** + * Maps topic UUIDs to structures containing topic information, including partitions. + */ + private final TimelineHashMap<Uuid, TopicControlInfo> topics; + + /** + * A map of broker IDs to the partitions that the broker is in the ISR for. + */ + private final BrokersToIsrs brokersToIsrs; + + ReplicationControlManager(SnapshotRegistry snapshotRegistry, + LogContext logContext, + Random random, + short defaultReplicationFactor, + int defaultNumPartitions, + ConfigurationControlManager configurationControl, + ClusterControlManager clusterControl) { + this.snapshotRegistry = snapshotRegistry; + this.log = logContext.logger(ReplicationControlManager.class); + this.random = random; + this.defaultReplicationFactor = defaultReplicationFactor; + this.defaultNumPartitions = defaultNumPartitions; + this.configurationControl = configurationControl; + this.clusterControl = clusterControl; + this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0); + this.topics = new TimelineHashMap<>(snapshotRegistry, 0); + this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry); + } + + public void replay(TopicRecord record) { + topicsByName.put(record.name(), record.topicId()); + topics.put(record.topicId(), new TopicControlInfo(snapshotRegistry, record.topicId())); + log.info("Created topic {} with ID {}.", record.name(), record.topicId()); + } + + public void replay(PartitionRecord record) { + TopicControlInfo topicInfo = topics.get(record.topicId()); + if (topicInfo == null) { + throw new RuntimeException("Tried to create partition " + record.topicId() + + ":" + record.partitionId() + ", but no topic with that ID was found."); + } + PartitionControlInfo newPartInfo = new PartitionControlInfo(record); + PartitionControlInfo prevPartInfo = topicInfo.parts.get(record.partitionId()); + if (prevPartInfo == null) { + log.info("Created partition {}:{} with {}.", record.topicId(), + record.partitionId(), newPartInfo.toString()); + topicInfo.parts.put(record.partitionId(), newPartInfo); + brokersToIsrs.update(record.topicId(), record.partitionId(), null, + newPartInfo.isr, -1, newPartInfo.leader); + } else { + String diff = newPartInfo.diff(prevPartInfo); + if (!diff.isEmpty()) { + log.info("Modified partition {}:{}: {}.", record.topicId(), + record.partitionId(), diff); + topicInfo.parts.put(record.partitionId(), newPartInfo); + brokersToIsrs.update(record.topicId(), record.partitionId(), + prevPartInfo.isr, newPartInfo.isr, prevPartInfo.leader, + newPartInfo.leader); + } + } + } + + public void replay(PartitionChangeRecord record) { + TopicControlInfo topicInfo = topics.get(record.topicId()); + if (topicInfo == null) { + throw new RuntimeException("Tried to create partition " + record.topicId() + + ":" + record.partitionId() + ", but no topic with that ID was found."); + } + PartitionControlInfo prevPartitionInfo = topicInfo.parts.get(record.partitionId()); + if (prevPartitionInfo == null) { + throw new RuntimeException("Tried to create partition " + record.topicId() + + ":" + record.partitionId() + ", but no partition with that id was found."); + } + PartitionControlInfo newPartitionInfo = prevPartitionInfo.merge(record); + topicInfo.parts.put(record.partitionId(), newPartitionInfo); + brokersToIsrs.update(record.topicId(), record.partitionId(), + prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.leader, + newPartitionInfo.leader); + log.debug("Applied ISR change record: {}", record.toString()); + } + + ControllerResult<CreateTopicsResponseData> + createTopics(CreateTopicsRequestData request) { + Map<String, ApiError> topicErrors = new HashMap<>(); + List<ApiMessageAndVersion> records = new ArrayList<>(); + + // Check the topic names. + validateNewTopicNames(topicErrors, request.topics()); + + // Identify topics that already exist and mark them with the appropriate error + request.topics().stream().filter(creatableTopic -> topicsByName.containsKey(creatableTopic.name())) + .forEach(t -> topicErrors.put(t.name(), new ApiError(Errors.TOPIC_ALREADY_EXISTS))); + + // Verify that the configurations for the new topics are OK, and figure out what + // ConfigRecords should be created. + Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges = + computeConfigChanges(topicErrors, request.topics()); + ControllerResult<Map<ConfigResource, ApiError>> configResult = + configurationControl.incrementalAlterConfigs(configChanges); + for (Entry<ConfigResource, ApiError> entry : configResult.response().entrySet()) { + if (entry.getValue().isFailure()) { + topicErrors.put(entry.getKey().name(), entry.getValue()); + } + } + records.addAll(configResult.records()); + + // Try to create whatever topics are needed. + Map<String, CreatableTopicResult> successes = new HashMap<>(); + for (CreatableTopic topic : request.topics()) { + if (topicErrors.containsKey(topic.name())) continue; + ApiError error = createTopic(topic, records, successes); + if (error.isFailure()) { + topicErrors.put(topic.name(), error); + } + } + + // Create responses for all topics. + CreateTopicsResponseData data = new CreateTopicsResponseData(); + StringBuilder resultsBuilder = new StringBuilder(); + String resultsPrefix = ""; + for (CreatableTopic topic : request.topics()) { + ApiError error = topicErrors.get(topic.name()); + if (error != null) { + data.topics().add(new CreatableTopicResult(). + setName(topic.name()). + setErrorCode(error.error().code()). + setErrorMessage(error.message())); + resultsBuilder.append(resultsPrefix).append(topic).append(": "). + append(error.error()).append(" (").append(error.message()).append(")"); + resultsPrefix = ", "; + continue; + } + CreatableTopicResult result = successes.get(topic.name()); + data.topics().add(result); + resultsBuilder.append(resultsPrefix).append(topic).append(": "). + append("SUCCESS"); + resultsPrefix = ", "; + } + log.info("createTopics result(s): {}", resultsBuilder.toString()); + return new ControllerResult<>(records, data); + } + + private ApiError createTopic(CreatableTopic topic, + List<ApiMessageAndVersion> records, + Map<String, CreatableTopicResult> successes) { + Map<Integer, PartitionControlInfo> newParts = new HashMap<>(); + if (!topic.assignments().isEmpty()) { + if (topic.replicationFactor() != -1) { + return new ApiError(Errors.INVALID_REQUEST, + "A manual partition assignment was specified, but replication " + + "factor was not set to -1."); + } + if (topic.numPartitions() != -1) { + return new ApiError(Errors.INVALID_REQUEST, + "A manual partition assignment was specified, but numPartitions " + + "was not set to -1."); + } + for (CreatableReplicaAssignment assignment : topic.assignments()) { + if (newParts.containsKey(assignment.partitionIndex())) { + return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, + "Found multiple manual partition assignments for partition " + + assignment.partitionIndex()); + } + HashSet<Integer> brokerIds = new HashSet<>(); + for (int brokerId : assignment.brokerIds()) { + if (!brokerIds.add(brokerId)) { + return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, + "The manual partition assignment specifies the same node " + + "id more than once."); + } else if (!clusterControl.unfenced(brokerId)) { + return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, + "The manual partition assignment contains node " + brokerId + + ", but that node is not usable."); + } + } + int[] replicas = new int[assignment.brokerIds().size()]; + for (int i = 0; i < replicas.length; i++) { + replicas[i] = assignment.brokerIds().get(i); + } + int[] isr = new int[assignment.brokerIds().size()]; + for (int i = 0; i < replicas.length; i++) { + isr[i] = assignment.brokerIds().get(i); + } + newParts.put(assignment.partitionIndex(), + new PartitionControlInfo(replicas, isr, null, null, isr[0], 0, 0)); + } + } else if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) { + return new ApiError(Errors.INVALID_REPLICATION_FACTOR, + "Replication factor was set to an invalid non-positive value."); + } else if (!topic.assignments().isEmpty()) { + return new ApiError(Errors.INVALID_REQUEST, + "Replication factor was not set to -1 but a manual partition " + + "assignment was specified."); + } else if (topic.numPartitions() < -1 || topic.numPartitions() == 0) { + return new ApiError(Errors.INVALID_PARTITIONS, + "Number of partitions was set to an invalid non-positive value."); + } else { + int numPartitions = topic.numPartitions() == -1 ? + defaultNumPartitions : topic.numPartitions(); + short replicationFactor = topic.replicationFactor() == -1 ? + defaultReplicationFactor : topic.replicationFactor(); + try { + List<List<Integer>> replicas = clusterControl. + placeReplicas(numPartitions, replicationFactor); + for (int partitionId = 0; partitionId < replicas.size(); partitionId++) { + int[] r = Replicas.toArray(replicas.get(partitionId)); + newParts.put(partitionId, + new PartitionControlInfo(r, r, null, null, r[0], 0, 0)); + } + } catch (InvalidReplicationFactorException e) { + return new ApiError(Errors.INVALID_REPLICATION_FACTOR, + "Unable to replicate the partition " + replicationFactor + + " times: " + e.getMessage()); + } + } + Uuid topicId = new Uuid(random.nextLong(), random.nextLong()); + successes.put(topic.name(), new CreatableTopicResult(). + setName(topic.name()). + setTopicId(topicId). + setErrorCode((short) 0). + setErrorMessage(null). + setNumPartitions(newParts.size()). + setReplicationFactor((short) newParts.get(0).replicas.length)); + records.add(new ApiMessageAndVersion(new TopicRecord(). + setName(topic.name()). + setTopicId(topicId), (short) 0)); + for (Entry<Integer, PartitionControlInfo> partEntry : newParts.entrySet()) { + int partitionIndex = partEntry.getKey(); + PartitionControlInfo info = partEntry.getValue(); + records.add(new ApiMessageAndVersion(new PartitionRecord(). + setPartitionId(partitionIndex). + setTopicId(topicId). + setReplicas(Replicas.toList(info.replicas)). + setIsr(Replicas.toList(info.isr)). + setRemovingReplicas(null). + setAddingReplicas(null). + setLeader(info.leader). + setLeaderEpoch(info.leaderEpoch). + setPartitionEpoch(0), (short) 0)); + } + return ApiError.NONE; + } + + static void validateNewTopicNames(Map<String, ApiError> topicErrors, + CreatableTopicCollection topics) { + for (CreatableTopic topic : topics) { + if (topicErrors.containsKey(topic.name())) continue; + try { + Topic.validate(topic.name()); + } catch (InvalidTopicException e) { + topicErrors.put(topic.name(), + new ApiError(Errors.INVALID_TOPIC_EXCEPTION, e.getMessage())); + } + } + } + + static Map<ConfigResource, Map<String, Entry<OpType, String>>> + computeConfigChanges(Map<String, ApiError> topicErrors, + CreatableTopicCollection topics) { + Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges = new HashMap<>(); + for (CreatableTopic topic : topics) { + if (topicErrors.containsKey(topic.name())) continue; + Map<String, Entry<OpType, String>> topicConfigs = new HashMap<>(); + for (CreateTopicsRequestData.CreateableTopicConfig config : topic.configs()) { + topicConfigs.put(config.name(), new SimpleImmutableEntry<>(SET, config.value())); + } + if (!topicConfigs.isEmpty()) { + configChanges.put(new ConfigResource(TOPIC, topic.name()), topicConfigs); + } + } + return configChanges; + } + + // VisibleForTesting + PartitionControlInfo getPartition(Uuid topicId, int partitionId) { + TopicControlInfo topic = topics.get(topicId); + if (topic == null) { + return null; + } + return topic.parts.get(partitionId); + } + + // VisibleForTesting + BrokersToIsrs brokersToIsrs() { + return brokersToIsrs; + } + + ControllerResult<AlterIsrResponseData> alterIsr(AlterIsrRequestData request) { + clusterControl.checkBrokerEpoch(request.brokerId(), request.brokerEpoch()); + AlterIsrResponseData response = new AlterIsrResponseData(); + List<ApiMessageAndVersion> records = new ArrayList<>(); + for (AlterIsrRequestData.TopicData topicData : request.topics()) { + AlterIsrResponseData.TopicData responseTopicData = + new AlterIsrResponseData.TopicData().setName(topicData.name()); + response.topics().add(responseTopicData); + Uuid topicId = topicsByName.get(topicData.name()); + if (topicId == null || !topics.containsKey(topicId)) { + for (AlterIsrRequestData.PartitionData partitionData : topicData.partitions()) { + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())); + } + continue; + } + TopicControlInfo topic = topics.get(topicId); + for (AlterIsrRequestData.PartitionData partitionData : topicData.partitions()) { + PartitionControlInfo partition = topic.parts.get(partitionData.partitionIndex()); + if (partition == null) { + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())); + continue; + } + if (partitionData.leaderEpoch() != partition.leaderEpoch) { + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.FENCED_LEADER_EPOCH.code())); + continue; + } + if (partitionData.currentIsrVersion() != partition.partitionEpoch) { + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.INVALID_UPDATE_VERSION.code())); + continue; + } + int[] newIsr = Replicas.toArray(partitionData.newIsr()); + if (!Replicas.validateIsr(partition.replicas, newIsr)) { + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.INVALID_REQUEST.code())); + } + if (!Replicas.contains(newIsr, partition.leader)) { + // An alterIsr request can't remove the current leader. + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(Errors.INVALID_REQUEST.code())); + } + records.add(new ApiMessageAndVersion(new PartitionChangeRecord(). + setPartitionId(partitionData.partitionIndex()). + setTopicId(topic.id). + setIsr(partitionData.newIsr()), (short) 0)); + } + } + return new ControllerResult<>(records, response); + } + + /** + * Generate the appropriate records to handle a broker being fenced. + * + * First, we remove this broker from any non-singleton ISR. Then we generate a + * FenceBrokerRecord. + * + * @param brokerId The broker id. + * @param records The record list to append to. + */ + + void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) { + BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId); + if (brokerRegistration == null) { + throw new RuntimeException("Can't find broker registration for broker " + brokerId); + } + handleNodeDeactivated(brokerId, false, records); + records.add(new ApiMessageAndVersion(new FenceBrokerRecord(). + setId(brokerId).setEpoch(brokerRegistration.epoch()), (short) 0)); + } + + /** + * Generate the appropriate records to handle a broker being unregistered. + * + * First, we remove this broker from any non-singleton ISR. Then we generate an + * UnregisterBrokerRecord. + * + * @param brokerId The broker id. + * @param brokerEpoch The broker epoch. + * @param records The record list to append to. + */ + void handleBrokerUnregistered(int brokerId, long brokerEpoch, + List<ApiMessageAndVersion> records) { + handleNodeDeactivated(brokerId, false, records); + records.add(new ApiMessageAndVersion(new UnregisterBrokerRecord(). + setBrokerId(brokerId).setBrokerEpoch(brokerEpoch), (short) 0)); + } + + /** + * Handle a broker being deactivated. This means we remove it from any ISR that has + * more than one element. We do not remove the broker from ISRs where it is the only + * member since this would preclude clean leader election in the future. + * It is removed as the leader for all partitions it leads. + * + * @param brokerId The broker id. + * @param alwaysBumpLeaderEpoch True if we should generate a record that + * always increments the leader epoch. + * @param records The record list to append to. + */ + void handleNodeDeactivated(int brokerId, boolean alwaysBumpLeaderEpoch, + List<ApiMessageAndVersion> records) { + Iterator<TopicPartition> iterator = brokersToIsrs.iterator(brokerId, false); + while (iterator.hasNext()) { + TopicPartition topicPartition = iterator.next(); + TopicControlInfo topic = topics.get(topicPartition.topicId()); + if (topic == null) { + throw new RuntimeException("Topic ID " + topicPartition.topicId() + " existed in " + + "isrMembers, but not in the topics map."); + } + PartitionControlInfo partition = topic.parts.get(topicPartition.partitionId()); + if (partition == null) { + throw new RuntimeException("Partition " + topicPartition + + " existed in isrMembers, but not in the partitions map."); + } + PartitionChangeRecord record = new PartitionChangeRecord(). + setPartitionId(topicPartition.partitionId()). + setTopicId(topic.id); + int[] newIsr = Replicas.copyWithout(partition.isr, brokerId); + if (newIsr.length == 0) { + // We don't want to shrink the ISR to size 0. So, leave the node in the ISR. + if (alwaysBumpLeaderEpoch || record.leader() != -1) { Review comment: Hmm, if the leader is already -1 and we can't change ISR, there is no need to generate a new PartitionChangeRecord just to bump up the leader epoch. It won't help controlled shutdown since there is already no leader. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org