Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
mumrah merged PR #14706: URL: https://github.com/apache/kafka/pull/14706 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on PR #14706: URL: https://github.com/apache/kafka/pull/14706#issuecomment-2005099068 The test failures are irrelevant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
mumrah commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1526384089 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -1369,7 +1384,23 @@ void handleBrokerInControlledShutdown(int brokerId, long brokerEpoch, List Create partition change records to remove replicas from any ISR or ELR for brokers doing unclean shutdown. ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -582,6 +586,10 @@ public void replay(RemoveTopicRecord record) { updatePartitionDirectories(topic.id, partitionId, partition.directories, null); } +for (int i = 0; i < partition.elr.length; i++) { Review Comment: nit: use a descriptive name instead of `i` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1520116151 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -1837,8 +1872,13 @@ void generateLeaderAndIsrUpdates(String context, // from the target ISR, but we need to exclude it here too, to handle the case // where there is an unclean leader election which chooses a leader from outside // the ISR. +// +// If the caller passed a valid broker ID for brokerWithUncleanShutdown, rather than +// passing NO_LEADER, this node should not be an acceptable leader. We also exclude +// brokerWithUncleanShutdown from ELR and ISR. IntPredicate isAcceptableLeader = -r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.isActive(r)); +r -> (r != brokerToRemove && r != brokerWithUncleanShutdown) +&& (r == brokerToAdd || clusterControl.isActive(r)); Review Comment: I disabled the handleUncleanShutdown if ELR is not enabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1520115442 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2240,6 +2283,25 @@ private void updatePartitionDirectories( } } +private void updatePartitionInfo( +Uuid topicId, +Integer partitionId, +PartitionRegistration prevPartInfo, +PartitionRegistration newPartInfo +) { +HashSet validationSet = new HashSet<>(); +Arrays.stream(newPartInfo.isr).forEach(ii -> validationSet.add(ii)); +Arrays.stream(newPartInfo.elr).forEach(ii -> validationSet.add(ii)); +if (validationSet.size() != newPartInfo.isr.length + newPartInfo.elr.length) { +log.warn("{}-{} has overlapping ISR={} and ELR={}", topics.get(topicId).name, partitionId, +Arrays.toString(newPartInfo.isr), partitionId, Arrays.toString(newPartInfo.elr)); Review Comment: Correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1520115750 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -336,10 +350,10 @@ public ControllerResult registerBroker( ", but got cluster ID " + request.clusterId()); } int brokerId = request.brokerId(); +List records = new ArrayList<>(); BrokerRegistration existing = brokerRegistrations.get(brokerId); if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) { -// TODO(KIP-966): Update the ELR if the broker has an unclean shutdown. -log.debug("Received an unclean shutdown request"); Review Comment: Good catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
mumrah commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1519830343 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2240,6 +2283,25 @@ private void updatePartitionDirectories( } } +private void updatePartitionInfo( +Uuid topicId, +Integer partitionId, +PartitionRegistration prevPartInfo, +PartitionRegistration newPartInfo +) { +HashSet validationSet = new HashSet<>(); +Arrays.stream(newPartInfo.isr).forEach(ii -> validationSet.add(ii)); +Arrays.stream(newPartInfo.elr).forEach(ii -> validationSet.add(ii)); +if (validationSet.size() != newPartInfo.isr.length + newPartInfo.elr.length) { +log.warn("{}-{} has overlapping ISR={} and ELR={}", topics.get(topicId).name, partitionId, +Arrays.toString(newPartInfo.isr), partitionId, Arrays.toString(newPartInfo.elr)); Review Comment: This can only happen if we have a bug where the ELR and ISR are allowed to overlap right? Since this is part of the `replay`, we shouldn't throw here (since the record has already been committed), but perhaps an ERROR is better than a WARN. ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -336,10 +350,10 @@ public ControllerResult registerBroker( ", but got cluster ID " + request.clusterId()); } int brokerId = request.brokerId(); +List records = new ArrayList<>(); BrokerRegistration existing = brokerRegistrations.get(brokerId); if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) { -// TODO(KIP-966): Update the ELR if the broker has an unclean shutdown. -log.debug("Received an unclean shutdown request"); Review Comment: I think we inadvertently lost this log message. ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2240,6 +2283,25 @@ private void updatePartitionDirectories( } } +private void updatePartitionInfo( +Uuid topicId, +Integer partitionId, +PartitionRegistration prevPartInfo, +PartitionRegistration newPartInfo +) { +HashSet validationSet = new HashSet<>(); +Arrays.stream(newPartInfo.isr).forEach(ii -> validationSet.add(ii)); +Arrays.stream(newPartInfo.elr).forEach(ii -> validationSet.add(ii)); Review Comment: nit: i think you can do `forEach(validationSet::add)` here ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -1815,12 +1845,17 @@ void validateManualPartitionAssignment( * broker to remove from the ISR and leadership, otherwise. * @param brokerToAdd NO_LEADER if no broker is being added; the ID of the * broker which is now eligible to be a leader, otherwise. + * @param brokerWithUncleanShutdown Review Comment: nit: update the main description of this method to mention ISR and ELR ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -780,4 +793,9 @@ public Entry> next() { } }; } + +@FunctionalInterface +interface BrokerUncleanShutdownHandler { +void apply(int brokerId, List records); Review Comment: nit: since we're defining an interface, we can use a more descriptive name than "apply" for the method. Maybe "addRecordsForShutdown" or something. ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -1837,8 +1872,13 @@ void generateLeaderAndIsrUpdates(String context, // from the target ISR, but we need to exclude it here too, to handle the case // where there is an unclean leader election which chooses a leader from outside // the ISR. +// +// If the caller passed a valid broker ID for brokerWithUncleanShutdown, rather than +// passing NO_LEADER, this node should not be an acceptable leader. We also exclude +// brokerWithUncleanShutdown from ELR and ISR. IntPredicate isAcceptableLeader = -r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.isActive(r)); +r -> (r != brokerToRemove && r != brokerWithUncleanShutdown) +&& (r == brokerToAdd || clusterControl.isActive(r)); Review Comment: Since our guards around ELR (if it's enabled or not) are in PartitionChangeBuilder, we need to make sure this logic is correct when ELR is not enabled due to MV. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1489894351 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -336,10 +342,14 @@ public ControllerResult registerBroker( ", but got cluster ID " + request.clusterId()); } int brokerId = request.brokerId(); +List records = new ArrayList<>(); BrokerRegistration existing = brokerRegistrations.get(brokerId); if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) { -// TODO(KIP-966): Update the ELR if the broker has an unclean shutdown. -log.debug("Received an unclean shutdown request"); +if (handleBrokerUncleanShutdownHelper == null) { +log.warn("No handleBrokerUncleanShutdownHelper provided"); Review Comment: Thanks for the advice! Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1489894070 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -780,4 +789,9 @@ public Entry> next() { } }; } + +@FunctionalInterface +interface HandleBrokerUncleanShutdownHelper { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
mumrah commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1489685922 ## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +import static org.apache.kafka.metadata.Replicas.NONE; + +public class BrokersToElrs { +private final SnapshotRegistry snapshotRegistry; + +// It maps from the broker id to the topic id partitions if the partition has ELR. +private final TimelineHashMap> elrMembers; + +BrokersToElrs(SnapshotRegistry snapshotRegistry) { +this.snapshotRegistry = snapshotRegistry; +this.elrMembers = new TimelineHashMap<>(snapshotRegistry, 0); +} + +/** + * Update our records of a partition's ELR. + * + * @param topicId The topic ID of the partition. + * @param partitionId The partition ID of the partition. + * @param prevElr The previous ELR, or null if the partition is new. + * @param nextElr The new ELR, or null if the partition is being removed. + */ + +void update(Uuid topicId, int partitionId, int[] prevElr, int[] nextElr) { +int[] prev; +if (prevElr == null) { +prev = NONE; +} else { +prev = Replicas.clone(prevElr); +Arrays.sort(prev); +} +int[] next; +if (nextElr == null) { +next = NONE; +} else { +next = Replicas.clone(nextElr); +Arrays.sort(next); +} + +int i = 0, j = 0; +while (true) { +if (i == prev.length) { +if (j == next.length) { +break; +} +int newReplica = next[j]; +add(newReplica, topicId, partitionId); +j++; +} else if (j == next.length) { +int prevReplica = prev[i]; +remove(prevReplica, topicId, partitionId); +i++; +} else { +int prevReplica = prev[i]; +int newReplica = next[j]; +if (prevReplica < newReplica) { +remove(prevReplica, topicId, partitionId); +i++; +} else if (prevReplica > newReplica) { +add(newReplica, topicId, partitionId); +j++; +} else { +i++; +j++; +} +} +} +} + +void removeTopicEntryForBroker(Uuid topicId, int brokerId) { +Map topicMap = elrMembers.get(brokerId); +if (topicMap != null) { +topicMap.remove(topicId); +} +} + +private void add(int brokerId, Uuid topicId, int newPartition) { +TimelineHashMap topicMap = elrMembers.get(brokerId); +if (topicMap == null) { +topicMap = new TimelineHashMap<>(snapshotRegistry, 0); +elrMembers.put(brokerId, topicMap); +} +int[] partitions = topicMap.get(topicId); +int[] newPartitions; +if (partitions == null) { +newPartitions = new int[1]; +} else { +newPartitions = new int[partitions.length + 1]; +System.arraycopy(partitions, 0, newPartitions, 0, partitions.length); +} +newPartitions[newPartitions.length - 1] = newPartition; +topicMap.put(topicId, newPartitions); +} + +private void remove(int brokerId, Uuid topicId, int removedPartition) { +TimelineHashMap topicMap = elrMembers.get(brokerId); +if (topicMap == null) { +throw new RuntimeException("Broker " + brokerId + " has no elrMembers " + +"entry, so we can't remove " + topicId + ":" + removedPartition); +} +int[] partitions = topicMap.get(topicId); +
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1408490077 ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -466,6 +466,10 @@ private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) { partition.lastKnownElr[0] != partition.leader)) { // Only update the last known leader when the first time the partition becomes leaderless. record.setLastKnownELR(Arrays.asList(partition.leader)); +} else if ((record.leader() > 0 || (partition.leader != NO_LEADER && record.leader() != NO_LEADER)) Review Comment: Done ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -528,6 +536,10 @@ private void maybePopulateTargetElr() { .collect(Collectors.toList()); } +static boolean recordHasLeader(PartitionChangeRecord record) { Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
artemlivshits commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1401454282 ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -528,6 +536,10 @@ private void maybePopulateTargetElr() { .collect(Collectors.toList()); } +static boolean recordHasLeader(PartitionChangeRecord record) { Review Comment: Do we use this now? ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -466,6 +466,10 @@ private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) { partition.lastKnownElr[0] != partition.leader)) { // Only update the last known leader when the first time the partition becomes leaderless. record.setLastKnownELR(Arrays.asList(partition.leader)); +} else if ((record.leader() > 0 || (partition.leader != NO_LEADER && record.leader() != NO_LEADER)) Review Comment: 0 is a valid leader, so probably should use `record.leader() >= 0`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1400042116 ## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; Review Comment: Updated the comment in the ReplicationControlManager. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1400041537 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -327,10 +333,12 @@ public ControllerResult registerBroker( ", but got cluster ID " + request.clusterId()); } int brokerId = request.brokerId(); +List records = new ArrayList<>(); BrokerRegistration existing = brokerRegistrations.get(brokerId); -if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) { -// TODO(KIP-966): Update the ELR if the broker has an unclean shutdown. -log.debug("Received an unclean shutdown request"); +if (version >= 2 +&& (existing == null || request.previousBrokerEpoch() != existing.epoch()) +&& replicationControlManager != null) { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1400041854 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -1772,7 +1814,8 @@ void generateLeaderAndIsrUpdates(String context, // where there is an unclean leader election which chooses a leader from outside // the ISR. IntPredicate isAcceptableLeader = -r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.isActive(r)); +r -> (r != brokerToRemove && r != brokerWithUncleanShutdown) Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1400041780 ## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +import static org.apache.kafka.metadata.Replicas.NONE; + +public class BrokersToElrs { +private final SnapshotRegistry snapshotRegistry; + +private final TimelineHashMap> elrMembers; Review Comment: Done. It is broker id -> TopicIdPartitions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1400036881 ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -466,6 +466,10 @@ private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) { partition.lastKnownElr[0] != partition.leader)) { // Only update the last known leader when the first time the partition becomes leaderless. record.setLastKnownELR(Arrays.asList(partition.leader)); +} else if ((recordHasLeader(record) || partition.leader != NO_LEADER) Review Comment: Good catch! To improve the readability, revise to the following. `record.leader() > 0 || (partition.leader != NO_LEADER && record.leader() != NO_LEADER)` `(Will have a new leader) || (will not become leaderless)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
artemlivshits commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1399798771 ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -466,6 +466,10 @@ private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) { partition.lastKnownElr[0] != partition.leader)) { // Only update the last known leader when the first time the partition becomes leaderless. record.setLastKnownELR(Arrays.asList(partition.leader)); +} else if ((recordHasLeader(record) || partition.leader != NO_LEADER) Review Comment: Can we have a case when `record.leader() == NO_LEADER && partition.leader != NO_LEADER` (i.e. partion has a valid leader but will not have a valid leader)? Would a more proper condition be `(recordHasLeader(record) || (partition.leader != NO_LEADER && record.leader() == NO_LEADER_CHANGE))` ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -1772,7 +1814,8 @@ void generateLeaderAndIsrUpdates(String context, // where there is an unclean leader election which chooses a leader from outside // the ISR. IntPredicate isAcceptableLeader = -r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.isActive(r)); +r -> (r != brokerToRemove && r != brokerWithUncleanShutdown) Review Comment: Can we update the comments to reflect the new logic? ## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; Review Comment: It looks like that in order to work correctly, we need to make sure that ISR and ELR are always disjoint sets (otherwise we'd generate multiple records for the same partition). Can we add a comment about this? ## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +import static org.apache.kafka.metadata.Replicas.NONE; + +public class BrokersToElrs { +private final SnapshotRegistry snapshotRegistry; + +private final TimelineHashMap> elrMembers; Review Comment: Can we add a comment what's mapped to what here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1399579994 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -327,10 +333,12 @@ public ControllerResult registerBroker( ", but got cluster ID " + request.clusterId()); } int brokerId = request.brokerId(); +List records = new ArrayList<>(); BrokerRegistration existing = brokerRegistrations.get(brokerId); -if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) { -// TODO(KIP-966): Update the ELR if the broker has an unclean shutdown. -log.debug("Received an unclean shutdown request"); Review Comment: Actually, the server may receive old registration requests due to poor network conditions or during rolling upgrades. I think we should also treat the old version requests as unclean shutdowns. ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -327,10 +333,12 @@ public ControllerResult registerBroker( ", but got cluster ID " + request.clusterId()); } int brokerId = request.brokerId(); +List records = new ArrayList<>(); BrokerRegistration existing = brokerRegistrations.get(brokerId); -if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) { -// TODO(KIP-966): Update the ELR if the broker has an unclean shutdown. -log.debug("Received an unclean shutdown request"); Review Comment: Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1399575912 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -1294,8 +1315,11 @@ void handleBrokerFenced(int brokerId, List records) { */ void handleBrokerUnregistered(int brokerId, long brokerEpoch, List records) { -generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, NO_LEADER, records, -brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); +generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, NO_LEADER, NO_LEADER, records, +new PartitionsOnReplicaIteratorChain(Arrays.asList( Review Comment: Removed. ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -466,6 +466,10 @@ private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) { partition.lastKnownElr[0] != partition.leader)) { // Only update the last known leader when the first time the partition becomes leaderless. record.setLastKnownELR(Arrays.asList(partition.leader)); +} else if ((record.leader() != NO_LEADER && record.leader() != NO_LEADER_CHANGE || partition.leader != NO_LEADER) Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
mumrah commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1399468190 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -327,10 +333,12 @@ public ControllerResult registerBroker( ", but got cluster ID " + request.clusterId()); } int brokerId = request.brokerId(); +List records = new ArrayList<>(); BrokerRegistration existing = brokerRegistrations.get(brokerId); -if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) { -// TODO(KIP-966): Update the ELR if the broker has an unclean shutdown. -log.debug("Received an unclean shutdown request"); +if (version >= 2 +&& (existing == null || request.previousBrokerEpoch() != existing.epoch()) +&& replicationControlManager != null) { Review Comment: ClusterControlManager#Builder should enforce that dependencies are satisfied, not the runtime code ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -466,6 +466,10 @@ private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) { partition.lastKnownElr[0] != partition.leader)) { // Only update the last known leader when the first time the partition becomes leaderless. record.setLastKnownELR(Arrays.asList(partition.leader)); +} else if ((record.leader() != NO_LEADER && record.leader() != NO_LEADER_CHANGE || partition.leader != NO_LEADER) Review Comment: Can we make the first part of this into a static method `recordHasLeader(PartitionChangeRecord record)` ? ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -1294,8 +1315,11 @@ void handleBrokerFenced(int brokerId, List records) { */ void handleBrokerUnregistered(int brokerId, long brokerEpoch, List records) { -generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, NO_LEADER, records, -brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); +generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, NO_LEADER, NO_LEADER, records, +new PartitionsOnReplicaIteratorChain(Arrays.asList( Review Comment: If it's not needed for correctness, yes lets remove the chained iterator, it's a bit confusing. ## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; Review Comment: I guess the alternative would be to extend the BrokersToIsrs to include a ELR_FLAG similar to the LEADER_FLAG. I agree this would probably be complex to implement. If I recall from the KIP, the ELR is normally empty, so maybe having this parallel data structure isn't so bad (since it will normally not be populated)? @cmccabe since you worked on `BrokersToIsrs` originally, do you have any opinion here? ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -327,10 +333,12 @@ public ControllerResult registerBroker( ", but got cluster ID " + request.clusterId()); } int brokerId = request.brokerId(); +List records = new ArrayList<>(); BrokerRegistration existing = brokerRegistrations.get(brokerId); -if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) { -// TODO(KIP-966): Update the ELR if the broker has an unclean shutdown. -log.debug("Received an unclean shutdown request"); Review Comment: For older BrokerRegistration versions, don't we still want to log if there was an unclean shutdown request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at:
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1384373047 ## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; Review Comment: To help review this file, you can refer to the BrokersToIsrs.java The reasons not to modify the BrokersToIsrs are: 1. The BrokersToIsrs is complicated, hard to extend. 2. You may think that we can simply let the BrokersToIsrs treat the ISR and ELR the same, storing the ISR and ELR together. However, the ISR and ELR do not behave exactly the same(fence a broker will only remove it from ISRs). ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -466,6 +466,9 @@ private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) { partition.lastKnownElr[0] != partition.leader)) { // Only update the last known leader when the first time the partition becomes leaderless. record.setLastKnownELR(Arrays.asList(partition.leader)); +} else if (record.leader() != NO_LEADER && record.leader() != NO_LEADER_CHANGE && partition.leader == NO_LEADER) { Review Comment: Refactor the last known leader handling a bit. ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -490,6 +493,10 @@ private void maybeUpdateRecordElr(PartitionChangeRecord record) { targetLastKnownElr = Replicas.toList(partition.lastKnownElr); } +// If the last known ELR is expected to store the last known leader, the lastKnownElr field should be updated +// later in maybeUpdateLastKnownLeader. +if (useLastKnownLeaderInBalancedRecovery) return; Review Comment: Refactor the last known leader handling a bit. ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -1294,8 +1315,11 @@ void handleBrokerFenced(int brokerId, List records) { */ void handleBrokerUnregistered(int brokerId, long brokerEpoch, List records) { -generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, NO_LEADER, records, -brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); +generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, NO_LEADER, NO_LEADER, records, +new PartitionsOnReplicaIteratorChain(Arrays.asList( Review Comment: We can avoid using the iterator chain by calling generateLeaderAndIsrUpdates twice. I am open to both ways. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org