mumrah commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1489685922


##########
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.kafka.metadata.Replicas.NONE;
+
+public class BrokersToElrs {
+    private final SnapshotRegistry snapshotRegistry;
+
+    // It maps from the broker id to the topic id partitions if the partition 
has ELR.
+    private final TimelineHashMap<Integer, TimelineHashMap<Uuid, int[]>> 
elrMembers;
+
+    BrokersToElrs(SnapshotRegistry snapshotRegistry) {
+        this.snapshotRegistry = snapshotRegistry;
+        this.elrMembers = new TimelineHashMap<>(snapshotRegistry, 0);
+    }
+
+    /**
+     * Update our records of a partition's ELR.
+     *
+     * @param topicId       The topic ID of the partition.
+     * @param partitionId   The partition ID of the partition.
+     * @param prevElr       The previous ELR, or null if the partition is new.
+     * @param nextElr       The new ELR, or null if the partition is being 
removed.
+     */
+
+    void update(Uuid topicId, int partitionId, int[] prevElr, int[] nextElr) {
+        int[] prev;
+        if (prevElr == null) {
+            prev = NONE;
+        } else {
+            prev = Replicas.clone(prevElr);
+            Arrays.sort(prev);
+        }
+        int[] next;
+        if (nextElr == null) {
+            next = NONE;
+        } else {
+            next = Replicas.clone(nextElr);
+            Arrays.sort(next);
+        }
+
+        int i = 0, j = 0;
+        while (true) {
+            if (i == prev.length) {
+                if (j == next.length) {
+                    break;
+                }
+                int newReplica = next[j];
+                add(newReplica, topicId, partitionId);
+                j++;
+            } else if (j == next.length) {
+                int prevReplica = prev[i];
+                remove(prevReplica, topicId, partitionId);
+                i++;
+            } else {
+                int prevReplica = prev[i];
+                int newReplica = next[j];
+                if (prevReplica < newReplica) {
+                    remove(prevReplica, topicId, partitionId);
+                    i++;
+                } else if (prevReplica > newReplica) {
+                    add(newReplica, topicId, partitionId);
+                    j++;
+                } else {
+                    i++;
+                    j++;
+                }
+            }
+        }
+    }
+
+    void removeTopicEntryForBroker(Uuid topicId, int brokerId) {
+        Map<Uuid, int[]> topicMap = elrMembers.get(brokerId);
+        if (topicMap != null) {
+            topicMap.remove(topicId);
+        }
+    }
+
+    private void add(int brokerId, Uuid topicId, int newPartition) {
+        TimelineHashMap<Uuid, int[]> topicMap = elrMembers.get(brokerId);
+        if (topicMap == null) {
+            topicMap = new TimelineHashMap<>(snapshotRegistry, 0);
+            elrMembers.put(brokerId, topicMap);
+        }
+        int[] partitions = topicMap.get(topicId);
+        int[] newPartitions;
+        if (partitions == null) {
+            newPartitions = new int[1];
+        } else {
+            newPartitions = new int[partitions.length + 1];
+            System.arraycopy(partitions, 0, newPartitions, 0, 
partitions.length);
+        }
+        newPartitions[newPartitions.length - 1] = newPartition;
+        topicMap.put(topicId, newPartitions);
+    }
+
+    private void remove(int brokerId, Uuid topicId, int removedPartition) {
+        TimelineHashMap<Uuid, int[]> topicMap = elrMembers.get(brokerId);
+        if (topicMap == null) {
+            throw new RuntimeException("Broker " + brokerId + " has no 
elrMembers " +
+                    "entry, so we can't remove " + topicId + ":" + 
removedPartition);
+        }
+        int[] partitions = topicMap.get(topicId);
+        if (partitions == null) {
+            throw new RuntimeException("Broker " + brokerId + " has no " +
+                    "entry in elrMembers for topic " + topicId);
+        }
+        if (partitions.length == 1) {
+            if (partitions[0] != removedPartition) {
+                throw new RuntimeException("Broker " + brokerId + " has no " +
+                        "entry in elrMembers for " + topicId + ":" + 
removedPartition);
+            }
+            topicMap.remove(topicId);
+            if (topicMap.isEmpty()) {
+                elrMembers.remove(brokerId);
+            }
+        } else {
+            int[] newPartitions = new int[partitions.length - 1];
+            int j = 0;
+            for (int i = 0; i < partitions.length; i++) {
+                int partition = partitions[i];
+                if (partition != removedPartition) {
+                    newPartitions[j++] = partition;
+                }
+            }
+            topicMap.put(topicId, newPartitions);
+        }
+    }
+
+    BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithBrokerInElr(int 
brokerId) {
+        Map<Uuid, int[]> topicMap = elrMembers.get(brokerId);
+        if (topicMap == null) {
+            topicMap = Collections.emptyMap();
+        }
+        return new BrokersToIsrs.PartitionsOnReplicaIterator(topicMap, false);
+    }
+}

Review Comment:
   nit: missing newline



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -336,10 +342,14 @@ public ControllerResult<BrokerRegistrationReply> 
registerBroker(
                 ", but got cluster ID " + request.clusterId());
         }
         int brokerId = request.brokerId();
+        List<ApiMessageAndVersion> records = new ArrayList<>();
         BrokerRegistration existing = brokerRegistrations.get(brokerId);
         if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {
-            // TODO(KIP-966): Update the ELR if the broker has an unclean 
shutdown.
-            log.debug("Received an unclean shutdown request");
+            if (handleBrokerUncleanShutdownHelper == null) {
+                log.warn("No handleBrokerUncleanShutdownHelper provided");

Review Comment:
   Related to the comment about the builder, we should do this validation when 
constructing the class so we can throw exceptions on startup if the handler was 
somehow not specified. 
   
   To work around the circular dependency of ClusterControlManager and 
ReplicaManager, you can add something like this to QuorumController:
   
   ```
   void handleUncleanBrokerShutdown(int brokerId, List<> records) {
     replicationControl.handleBrokerUncleanShutdown(brokerId, records);
   }
   ```
   
   and use that method in the builder of ClusterControlManager.
   
   ```
    
setUncleanBrokerShutdownHandler(QuorumController.this::handleUncleanBrokerShutdown)
   ```



##########
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;

Review Comment:
   If we keeping this as a second data structure and we need to ensure 
BrokersToIsrs and BrokersToElrs are disjoint, I think we need some 
guards/checks. For example, it seems we are always updating both data structures
   
   ```
   brokersToIsrs.update(record.topicId(), record.partitionId(), 
prevPartInfo.isr,
     newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader);
   brokersToElrs.update(record.topicId(), record.partitionId(), 
prevPartInfo.elr,
     newPartInfo.elr);
   ```
   
   If we need to ensure a replica is not in the ISR and ELR for correctness, we 
should have a check.



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -304,6 +306,10 @@ public void deactivate() {
         heartbeatManager = null;
     }
 
+    public void 
setHandleBrokerUncleanShutdownHelper(HandleBrokerUncleanShutdownHelper 
handleBrokerUncleanShutdownHelper) {
+        this.handleBrokerUncleanShutdownHelper = 
handleBrokerUncleanShutdownHelper;
+    }
+

Review Comment:
   Can you move this to the Builder?



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -780,4 +789,9 @@ public Entry<Integer, Map<String, VersionRange>> next() {
             }
         };
     }
+
+    @FunctionalInterface
+    interface HandleBrokerUncleanShutdownHelper {

Review Comment:
   naming nit: how about BrokerUncleanShutdownHandler or just 
UncleanShutdownHandler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to