cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577920610



##########
File path: metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+
+/**
+ * Associates brokers with their in-sync partitions.
+ *
+ * This is useful when we need to remove a broker from all the ISRs, or move 
all leaders
+ * away from a broker.
+ *
+ * We also track all the partitions that currently have no leader.
+ *
+ * The core data structure is a map from broker IDs to topic maps.  Each topic 
map relates
+ * topic UUIDs to arrays of partition IDs.
+ *
+ * Each entry in the array has a high bit which indicates that the broker is 
the leader
+ * for the given partition, as well as 31 low bits which contain the partition 
id.  This
+ * works because partition IDs cannot be negative.
+ */
+public class BrokersToIsrs {
+    private final static int[] EMPTY = new int[0];
+
+    private final static int LEADER_FLAG = 0x8000_0000;
+
+    private final static int REPLICA_MASK = 0x7fff_ffff;
+
+    static class TopicPartition {
+        private final Uuid topicId;
+        private final int partitionId;
+
+        TopicPartition(Uuid topicId, int partitionId) {
+            this.topicId = topicId;
+            this.partitionId = partitionId;
+        }
+
+        public Uuid topicId() {
+            return topicId;
+        }
+
+        public int partitionId() {
+            return partitionId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof TopicPartition)) return false;
+            TopicPartition other = (TopicPartition) o;
+            return other.topicId.equals(topicId) && other.partitionId == 
partitionId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(topicId, partitionId);
+        }
+
+        @Override
+        public String toString() {
+            return topicId + ":" + partitionId;
+        }
+    }
+
+    static class PartitionsOnReplicaIterator implements 
Iterator<TopicPartition> {
+        private final Iterator<Entry<Uuid, int[]>> iterator;
+        private final boolean leaderOnly;
+        private int offset = 0;
+        Uuid uuid = Uuid.ZERO_UUID;
+        int[] replicas = EMPTY;
+        private TopicPartition next = null;
+
+        PartitionsOnReplicaIterator(Map<Uuid, int[]> topicMap, boolean 
leaderOnly) {
+            this.iterator = topicMap.entrySet().iterator();
+            this.leaderOnly = leaderOnly;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (next != null) return true;
+            while (true) {
+                if (offset >= replicas.length) {
+                    if (!iterator.hasNext()) return false;
+                    offset = 0;
+                    Entry<Uuid, int[]> entry = iterator.next();
+                    uuid = entry.getKey();
+                    replicas = entry.getValue();
+                }
+                int replica = replicas[offset++];
+                if ((!leaderOnly) || (replica & LEADER_FLAG) != 0) {
+                    next = new TopicPartition(uuid, replica & REPLICA_MASK);
+                    return true;
+                }
+            }
+        }
+
+        @Override
+        public TopicPartition next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            TopicPartition result = next;
+            next = null;
+            return result;
+        }
+    }
+
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * A map of broker IDs to the partitions that the broker is in the ISR for.
+     * Partitions with no isr members appear in this map under id -1.
+     */
+    private final TimelineHashMap<Integer, TimelineHashMap<Uuid, int[]>> 
isrMembers;
+
+    BrokersToIsrs(SnapshotRegistry snapshotRegistry) {
+        this.snapshotRegistry = snapshotRegistry;
+        this.isrMembers = new TimelineHashMap<>(snapshotRegistry, 0);
+    }
+
+    /**
+     * Update our records of a partition's ISR.
+     *
+     * @param topicId       The topic ID of the partition.
+     * @param partitionId   The partition ID of the partition.
+     * @param prevIsr       The previous ISR, or null if the partition is new.
+     * @param nextIsr       The new ISR, or null if the partition is being 
removed.
+     * @param prevLeader    The previous leader, or -1 if the partition had no 
leader.
+     * @param nextLeader    The new leader, or -1 if the partition now has no 
leader.
+     */
+    void update(Uuid topicId, int partitionId, int[] prevIsr, int[] nextIsr,
+                int prevLeader, int nextLeader) {
+        int[] prev;
+        if (prevIsr == null) {
+            prev = EMPTY;
+        } else {
+            if (prevLeader == -1) {
+                prev = Replicas.copyWith(prevIsr, -1);
+            } else {
+                prev = Replicas.clone(prevIsr);
+            }
+            Arrays.sort(prev);
+        }
+        int[] next;
+        if (nextIsr == null) {
+            next = EMPTY;
+        } else {
+            if (nextLeader == -1) {
+                next = Replicas.copyWith(nextIsr, -1);
+            } else {
+                next = Replicas.clone(nextIsr);
+            }
+            Arrays.sort(next);
+        }
+        int i = 0, j = 0;
+        while (true) {
+            if (i == prev.length) {
+                if (j == next.length) {
+                    break;
+                }
+                int newReplica = next[j];
+                add(newReplica, topicId, partitionId, newReplica == 
nextLeader);
+                j++;
+            } else if (j == next.length) {
+                int prevReplica = prev[i];
+                remove(prevReplica, topicId, partitionId, prevReplica == 
prevLeader);
+                i++;
+            } else {
+                int prevReplica = prev[i];
+                int newReplica = next[j];
+                if (prevReplica < newReplica) {
+                    remove(prevReplica, topicId, partitionId, prevReplica == 
prevLeader);
+                    i++;
+                } else if (prevReplica > newReplica) {
+                    add(newReplica, topicId, partitionId, newReplica == 
nextLeader);
+                    j++;
+                } else {
+                    boolean wasLeader = prevReplica == prevLeader;
+                    boolean isLeader = prevReplica == nextLeader;
+                    if (wasLeader != isLeader) {
+                        change(prevReplica, topicId, partitionId, wasLeader, 
isLeader);
+                    }
+                    i++;
+                    j++;
+                }
+            }
+        }
+    }
+
+    private void add(int brokerId, Uuid topicId, int newPartition, boolean 
leader) {
+        if (leader) {
+            newPartition = newPartition | LEADER_FLAG;
+        }
+        TimelineHashMap<Uuid, int[]> topicMap = isrMembers.get(brokerId);
+        if (topicMap == null) {
+            topicMap = new TimelineHashMap<>(snapshotRegistry, 0);
+            isrMembers.put(brokerId, topicMap);
+        }
+        int[] partitions = topicMap.get(topicId);
+        int[] newPartitions;
+        if (partitions == null) {
+            newPartitions = new int[1];
+        } else {
+            newPartitions = new int[partitions.length + 1];
+            System.arraycopy(partitions, 0, newPartitions, 0, 
partitions.length);
+        }
+        newPartitions[newPartitions.length - 1] = newPartition;
+        topicMap.put(topicId, newPartitions);
+    }
+
+    private void change(int brokerId, Uuid topicId, int partition,
+                        boolean wasLeader, boolean isLeader) {
+        TimelineHashMap<Uuid, int[]> topicMap = isrMembers.get(brokerId);
+        if (topicMap == null) {
+            throw new RuntimeException("Broker " + brokerId + " has no 
isrMembers " +
+                "entry, so we can't change " + topicId + ":" + partition);
+        }
+        int[] partitions = topicMap.get(topicId);
+        if (partitions == null) {
+            throw new RuntimeException("Broker " + brokerId + " has no " +
+                "entry in isrMembers for topic " + topicId);
+        }
+        int[] newPartitions = new int[partitions.length];
+        int target = wasLeader ? partition | LEADER_FLAG : partition;
+        for (int i = 0; i < partitions.length; i++) {
+            int cur = partitions[i];
+            if (cur == target) {
+                newPartitions[i] = isLeader ? partition | LEADER_FLAG : 
partition;

Review comment:
       since this is stored in a `TimelineHashMap` it must be treated as 
immutable.  We can't modify the past.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to