cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r577920610
########## File path: metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Objects; + + +/** + * Associates brokers with their in-sync partitions. + * + * This is useful when we need to remove a broker from all the ISRs, or move all leaders + * away from a broker. + * + * We also track all the partitions that currently have no leader. + * + * The core data structure is a map from broker IDs to topic maps. Each topic map relates + * topic UUIDs to arrays of partition IDs. + * + * Each entry in the array has a high bit which indicates that the broker is the leader + * for the given partition, as well as 31 low bits which contain the partition id. This + * works because partition IDs cannot be negative. + */ +public class BrokersToIsrs { + private final static int[] EMPTY = new int[0]; + + private final static int LEADER_FLAG = 0x8000_0000; + + private final static int REPLICA_MASK = 0x7fff_ffff; + + static class TopicPartition { + private final Uuid topicId; + private final int partitionId; + + TopicPartition(Uuid topicId, int partitionId) { + this.topicId = topicId; + this.partitionId = partitionId; + } + + public Uuid topicId() { + return topicId; + } + + public int partitionId() { + return partitionId; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof TopicPartition)) return false; + TopicPartition other = (TopicPartition) o; + return other.topicId.equals(topicId) && other.partitionId == partitionId; + } + + @Override + public int hashCode() { + return Objects.hash(topicId, partitionId); + } + + @Override + public String toString() { + return topicId + ":" + partitionId; + } + } + + static class PartitionsOnReplicaIterator implements Iterator<TopicPartition> { + private final Iterator<Entry<Uuid, int[]>> iterator; + private final boolean leaderOnly; + private int offset = 0; + Uuid uuid = Uuid.ZERO_UUID; + int[] replicas = EMPTY; + private TopicPartition next = null; + + PartitionsOnReplicaIterator(Map<Uuid, int[]> topicMap, boolean leaderOnly) { + this.iterator = topicMap.entrySet().iterator(); + this.leaderOnly = leaderOnly; + } + + @Override + public boolean hasNext() { + if (next != null) return true; + while (true) { + if (offset >= replicas.length) { + if (!iterator.hasNext()) return false; + offset = 0; + Entry<Uuid, int[]> entry = iterator.next(); + uuid = entry.getKey(); + replicas = entry.getValue(); + } + int replica = replicas[offset++]; + if ((!leaderOnly) || (replica & LEADER_FLAG) != 0) { + next = new TopicPartition(uuid, replica & REPLICA_MASK); + return true; + } + } + } + + @Override + public TopicPartition next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + TopicPartition result = next; + next = null; + return result; + } + } + + private final SnapshotRegistry snapshotRegistry; + + /** + * A map of broker IDs to the partitions that the broker is in the ISR for. + * Partitions with no isr members appear in this map under id -1. + */ + private final TimelineHashMap<Integer, TimelineHashMap<Uuid, int[]>> isrMembers; + + BrokersToIsrs(SnapshotRegistry snapshotRegistry) { + this.snapshotRegistry = snapshotRegistry; + this.isrMembers = new TimelineHashMap<>(snapshotRegistry, 0); + } + + /** + * Update our records of a partition's ISR. + * + * @param topicId The topic ID of the partition. + * @param partitionId The partition ID of the partition. + * @param prevIsr The previous ISR, or null if the partition is new. + * @param nextIsr The new ISR, or null if the partition is being removed. + * @param prevLeader The previous leader, or -1 if the partition had no leader. + * @param nextLeader The new leader, or -1 if the partition now has no leader. + */ + void update(Uuid topicId, int partitionId, int[] prevIsr, int[] nextIsr, + int prevLeader, int nextLeader) { + int[] prev; + if (prevIsr == null) { + prev = EMPTY; + } else { + if (prevLeader == -1) { + prev = Replicas.copyWith(prevIsr, -1); + } else { + prev = Replicas.clone(prevIsr); + } + Arrays.sort(prev); + } + int[] next; + if (nextIsr == null) { + next = EMPTY; + } else { + if (nextLeader == -1) { + next = Replicas.copyWith(nextIsr, -1); + } else { + next = Replicas.clone(nextIsr); + } + Arrays.sort(next); + } + int i = 0, j = 0; + while (true) { + if (i == prev.length) { + if (j == next.length) { + break; + } + int newReplica = next[j]; + add(newReplica, topicId, partitionId, newReplica == nextLeader); + j++; + } else if (j == next.length) { + int prevReplica = prev[i]; + remove(prevReplica, topicId, partitionId, prevReplica == prevLeader); + i++; + } else { + int prevReplica = prev[i]; + int newReplica = next[j]; + if (prevReplica < newReplica) { + remove(prevReplica, topicId, partitionId, prevReplica == prevLeader); + i++; + } else if (prevReplica > newReplica) { + add(newReplica, topicId, partitionId, newReplica == nextLeader); + j++; + } else { + boolean wasLeader = prevReplica == prevLeader; + boolean isLeader = prevReplica == nextLeader; + if (wasLeader != isLeader) { + change(prevReplica, topicId, partitionId, wasLeader, isLeader); + } + i++; + j++; + } + } + } + } + + private void add(int brokerId, Uuid topicId, int newPartition, boolean leader) { + if (leader) { + newPartition = newPartition | LEADER_FLAG; + } + TimelineHashMap<Uuid, int[]> topicMap = isrMembers.get(brokerId); + if (topicMap == null) { + topicMap = new TimelineHashMap<>(snapshotRegistry, 0); + isrMembers.put(brokerId, topicMap); + } + int[] partitions = topicMap.get(topicId); + int[] newPartitions; + if (partitions == null) { + newPartitions = new int[1]; + } else { + newPartitions = new int[partitions.length + 1]; + System.arraycopy(partitions, 0, newPartitions, 0, partitions.length); + } + newPartitions[newPartitions.length - 1] = newPartition; + topicMap.put(topicId, newPartitions); + } + + private void change(int brokerId, Uuid topicId, int partition, + boolean wasLeader, boolean isLeader) { + TimelineHashMap<Uuid, int[]> topicMap = isrMembers.get(brokerId); + if (topicMap == null) { + throw new RuntimeException("Broker " + brokerId + " has no isrMembers " + + "entry, so we can't change " + topicId + ":" + partition); + } + int[] partitions = topicMap.get(topicId); + if (partitions == null) { + throw new RuntimeException("Broker " + brokerId + " has no " + + "entry in isrMembers for topic " + topicId); + } + int[] newPartitions = new int[partitions.length]; + int target = wasLeader ? partition | LEADER_FLAG : partition; + for (int i = 0; i < partitions.length; i++) { + int cur = partitions[i]; + if (cur == target) { + newPartitions[i] = isLeader ? partition | LEADER_FLAG : partition; Review comment: since this is stored in a `TimelineHashMap` it must be treated as immutable. We can't modify the past. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org