mumrah commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1489685922
########## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +import static org.apache.kafka.metadata.Replicas.NONE; + +public class BrokersToElrs { + private final SnapshotRegistry snapshotRegistry; + + // It maps from the broker id to the topic id partitions if the partition has ELR. + private final TimelineHashMap<Integer, TimelineHashMap<Uuid, int[]>> elrMembers; + + BrokersToElrs(SnapshotRegistry snapshotRegistry) { + this.snapshotRegistry = snapshotRegistry; + this.elrMembers = new TimelineHashMap<>(snapshotRegistry, 0); + } + + /** + * Update our records of a partition's ELR. + * + * @param topicId The topic ID of the partition. + * @param partitionId The partition ID of the partition. + * @param prevElr The previous ELR, or null if the partition is new. + * @param nextElr The new ELR, or null if the partition is being removed. + */ + + void update(Uuid topicId, int partitionId, int[] prevElr, int[] nextElr) { + int[] prev; + if (prevElr == null) { + prev = NONE; + } else { + prev = Replicas.clone(prevElr); + Arrays.sort(prev); + } + int[] next; + if (nextElr == null) { + next = NONE; + } else { + next = Replicas.clone(nextElr); + Arrays.sort(next); + } + + int i = 0, j = 0; + while (true) { + if (i == prev.length) { + if (j == next.length) { + break; + } + int newReplica = next[j]; + add(newReplica, topicId, partitionId); + j++; + } else if (j == next.length) { + int prevReplica = prev[i]; + remove(prevReplica, topicId, partitionId); + i++; + } else { + int prevReplica = prev[i]; + int newReplica = next[j]; + if (prevReplica < newReplica) { + remove(prevReplica, topicId, partitionId); + i++; + } else if (prevReplica > newReplica) { + add(newReplica, topicId, partitionId); + j++; + } else { + i++; + j++; + } + } + } + } + + void removeTopicEntryForBroker(Uuid topicId, int brokerId) { + Map<Uuid, int[]> topicMap = elrMembers.get(brokerId); + if (topicMap != null) { + topicMap.remove(topicId); + } + } + + private void add(int brokerId, Uuid topicId, int newPartition) { + TimelineHashMap<Uuid, int[]> topicMap = elrMembers.get(brokerId); + if (topicMap == null) { + topicMap = new TimelineHashMap<>(snapshotRegistry, 0); + elrMembers.put(brokerId, topicMap); + } + int[] partitions = topicMap.get(topicId); + int[] newPartitions; + if (partitions == null) { + newPartitions = new int[1]; + } else { + newPartitions = new int[partitions.length + 1]; + System.arraycopy(partitions, 0, newPartitions, 0, partitions.length); + } + newPartitions[newPartitions.length - 1] = newPartition; + topicMap.put(topicId, newPartitions); + } + + private void remove(int brokerId, Uuid topicId, int removedPartition) { + TimelineHashMap<Uuid, int[]> topicMap = elrMembers.get(brokerId); + if (topicMap == null) { + throw new RuntimeException("Broker " + brokerId + " has no elrMembers " + + "entry, so we can't remove " + topicId + ":" + removedPartition); + } + int[] partitions = topicMap.get(topicId); + if (partitions == null) { + throw new RuntimeException("Broker " + brokerId + " has no " + + "entry in elrMembers for topic " + topicId); + } + if (partitions.length == 1) { + if (partitions[0] != removedPartition) { + throw new RuntimeException("Broker " + brokerId + " has no " + + "entry in elrMembers for " + topicId + ":" + removedPartition); + } + topicMap.remove(topicId); + if (topicMap.isEmpty()) { + elrMembers.remove(brokerId); + } + } else { + int[] newPartitions = new int[partitions.length - 1]; + int j = 0; + for (int i = 0; i < partitions.length; i++) { + int partition = partitions[i]; + if (partition != removedPartition) { + newPartitions[j++] = partition; + } + } + topicMap.put(topicId, newPartitions); + } + } + + BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithBrokerInElr(int brokerId) { + Map<Uuid, int[]> topicMap = elrMembers.get(brokerId); + if (topicMap == null) { + topicMap = Collections.emptyMap(); + } + return new BrokersToIsrs.PartitionsOnReplicaIterator(topicMap, false); + } +} Review Comment: nit: missing newline ########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -336,10 +342,14 @@ public ControllerResult<BrokerRegistrationReply> registerBroker( ", but got cluster ID " + request.clusterId()); } int brokerId = request.brokerId(); + List<ApiMessageAndVersion> records = new ArrayList<>(); BrokerRegistration existing = brokerRegistrations.get(brokerId); if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) { - // TODO(KIP-966): Update the ELR if the broker has an unclean shutdown. - log.debug("Received an unclean shutdown request"); + if (handleBrokerUncleanShutdownHelper == null) { + log.warn("No handleBrokerUncleanShutdownHelper provided"); Review Comment: Related to the comment about the builder, we should do this validation when constructing the class so we can throw exceptions on startup if the handler was somehow not specified. To work around the circular dependency of ClusterControlManager and ReplicaManager, you can add something like this to QuorumController: ``` void handleUncleanBrokerShutdown(int brokerId, List<> records) { replicationControl.handleBrokerUncleanShutdown(brokerId, records); } ``` and use that method in the builder of ClusterControlManager. ``` setUncleanBrokerShutdownHandler(QuorumController.this::handleUncleanBrokerShutdown) ``` ########## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; Review Comment: If we keeping this as a second data structure and we need to ensure BrokersToIsrs and BrokersToElrs are disjoint, I think we need some guards/checks. For example, it seems we are always updating both data structures ``` brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartInfo.isr, newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader); brokersToElrs.update(record.topicId(), record.partitionId(), prevPartInfo.elr, newPartInfo.elr); ``` If we need to ensure a replica is not in the ISR and ELR for correctness, we should have a check. ########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -304,6 +306,10 @@ public void deactivate() { heartbeatManager = null; } + public void setHandleBrokerUncleanShutdownHelper(HandleBrokerUncleanShutdownHelper handleBrokerUncleanShutdownHelper) { + this.handleBrokerUncleanShutdownHelper = handleBrokerUncleanShutdownHelper; + } + Review Comment: Can you move this to the Builder? ########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -780,4 +789,9 @@ public Entry<Integer, Map<String, VersionRange>> next() { } }; } + + @FunctionalInterface + interface HandleBrokerUncleanShutdownHelper { Review Comment: naming nit: how about BrokerUncleanShutdownHandler or just UncleanShutdownHandler -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org