Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-04-04 Thread via GitHub


mumrah merged PR #14706:
URL: https://github.com/apache/kafka/pull/14706


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-03-18 Thread via GitHub


CalvinConfluent commented on PR #14706:
URL: https://github.com/apache/kafka/pull/14706#issuecomment-2005099068

   The test failures are irrelevant. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-03-15 Thread via GitHub


mumrah commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1526384089


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1369,7 +1384,23 @@ void handleBrokerInControlledShutdown(int brokerId, long 
brokerEpoch, List Create partition change records to remove replicas from any ISR or ELR for 
brokers doing unclean shutdown.
   



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -582,6 +586,10 @@ public void replay(RemoveTopicRecord record) {
 updatePartitionDirectories(topic.id, partitionId, 
partition.directories, null);
 }
 
+for (int i = 0; i < partition.elr.length; i++) {

Review Comment:
   nit: use a descriptive name instead of `i`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-03-11 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1520116151


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1837,8 +1872,13 @@ void generateLeaderAndIsrUpdates(String context,
 // from the target ISR, but we need to exclude it here too, to handle 
the case
 // where there is an unclean leader election which chooses a leader 
from outside
 // the ISR.
+//
+// If the caller passed a valid broker ID for 
brokerWithUncleanShutdown, rather than
+// passing NO_LEADER, this node should not be an acceptable leader. We 
also exclude
+// brokerWithUncleanShutdown from ELR and ISR.
 IntPredicate isAcceptableLeader =
-r -> (r != brokerToRemove) && (r == brokerToAdd || 
clusterControl.isActive(r));
+r -> (r != brokerToRemove && r != brokerWithUncleanShutdown)
+&& (r == brokerToAdd || clusterControl.isActive(r));

Review Comment:
   I disabled the handleUncleanShutdown if ELR is not enabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-03-11 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1520115442


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2240,6 +2283,25 @@ private void updatePartitionDirectories(
 }
 }
 
+private void updatePartitionInfo(
+Uuid topicId,
+Integer partitionId,
+PartitionRegistration prevPartInfo,
+PartitionRegistration newPartInfo
+) {
+HashSet validationSet = new HashSet<>();
+Arrays.stream(newPartInfo.isr).forEach(ii -> validationSet.add(ii));
+Arrays.stream(newPartInfo.elr).forEach(ii -> validationSet.add(ii));
+if (validationSet.size() != newPartInfo.isr.length + 
newPartInfo.elr.length) {
+log.warn("{}-{} has overlapping ISR={} and ELR={}", 
topics.get(topicId).name, partitionId,
+Arrays.toString(newPartInfo.isr), partitionId, 
Arrays.toString(newPartInfo.elr));

Review Comment:
   Correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-03-11 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1520115750


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -336,10 +350,10 @@ public ControllerResult 
registerBroker(
 ", but got cluster ID " + request.clusterId());
 }
 int brokerId = request.brokerId();
+List records = new ArrayList<>();
 BrokerRegistration existing = brokerRegistrations.get(brokerId);
 if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {
-// TODO(KIP-966): Update the ELR if the broker has an unclean 
shutdown.
-log.debug("Received an unclean shutdown request");

Review Comment:
   Good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-03-11 Thread via GitHub


mumrah commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1519830343


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2240,6 +2283,25 @@ private void updatePartitionDirectories(
 }
 }
 
+private void updatePartitionInfo(
+Uuid topicId,
+Integer partitionId,
+PartitionRegistration prevPartInfo,
+PartitionRegistration newPartInfo
+) {
+HashSet validationSet = new HashSet<>();
+Arrays.stream(newPartInfo.isr).forEach(ii -> validationSet.add(ii));
+Arrays.stream(newPartInfo.elr).forEach(ii -> validationSet.add(ii));
+if (validationSet.size() != newPartInfo.isr.length + 
newPartInfo.elr.length) {
+log.warn("{}-{} has overlapping ISR={} and ELR={}", 
topics.get(topicId).name, partitionId,
+Arrays.toString(newPartInfo.isr), partitionId, 
Arrays.toString(newPartInfo.elr));

Review Comment:
   This can only happen if we have a bug where the ELR and ISR are allowed to 
overlap right? 
   
   Since this is part of the `replay`, we shouldn't throw here (since the 
record has already been committed), but perhaps an ERROR is better than a WARN.



##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -336,10 +350,10 @@ public ControllerResult 
registerBroker(
 ", but got cluster ID " + request.clusterId());
 }
 int brokerId = request.brokerId();
+List records = new ArrayList<>();
 BrokerRegistration existing = brokerRegistrations.get(brokerId);
 if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {
-// TODO(KIP-966): Update the ELR if the broker has an unclean 
shutdown.
-log.debug("Received an unclean shutdown request");

Review Comment:
   I think we inadvertently lost this log message.



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2240,6 +2283,25 @@ private void updatePartitionDirectories(
 }
 }
 
+private void updatePartitionInfo(
+Uuid topicId,
+Integer partitionId,
+PartitionRegistration prevPartInfo,
+PartitionRegistration newPartInfo
+) {
+HashSet validationSet = new HashSet<>();
+Arrays.stream(newPartInfo.isr).forEach(ii -> validationSet.add(ii));
+Arrays.stream(newPartInfo.elr).forEach(ii -> validationSet.add(ii));

Review Comment:
   nit: i think you can do `forEach(validationSet::add)` here



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1815,12 +1845,17 @@ void validateManualPartitionAssignment(
  *  broker to remove from the ISR and leadership, 
otherwise.
  * @param brokerToAdd   NO_LEADER if no broker is being added; the ID 
of the
  *  broker which is now eligible to be a leader, 
otherwise.
+ * @param brokerWithUncleanShutdown

Review Comment:
   nit: update the main description of this method to mention ISR and ELR



##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -780,4 +793,9 @@ public Entry> next() {
 }
 };
 }
+
+@FunctionalInterface
+interface BrokerUncleanShutdownHandler {
+void apply(int brokerId, List records);

Review Comment:
   nit: since we're defining an interface, we can use a more descriptive name 
than "apply" for the method. Maybe "addRecordsForShutdown" or something.



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1837,8 +1872,13 @@ void generateLeaderAndIsrUpdates(String context,
 // from the target ISR, but we need to exclude it here too, to handle 
the case
 // where there is an unclean leader election which chooses a leader 
from outside
 // the ISR.
+//
+// If the caller passed a valid broker ID for 
brokerWithUncleanShutdown, rather than
+// passing NO_LEADER, this node should not be an acceptable leader. We 
also exclude
+// brokerWithUncleanShutdown from ELR and ISR.
 IntPredicate isAcceptableLeader =
-r -> (r != brokerToRemove) && (r == brokerToAdd || 
clusterControl.isActive(r));
+r -> (r != brokerToRemove && r != brokerWithUncleanShutdown)
+&& (r == brokerToAdd || clusterControl.isActive(r));

Review Comment:
   Since our guards around ELR (if it's enabled or not) are in 
PartitionChangeBuilder, we need to make sure this logic is correct when ELR is 
not enabled due to MV. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-02-14 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1489894351


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -336,10 +342,14 @@ public ControllerResult 
registerBroker(
 ", but got cluster ID " + request.clusterId());
 }
 int brokerId = request.brokerId();
+List records = new ArrayList<>();
 BrokerRegistration existing = brokerRegistrations.get(brokerId);
 if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {
-// TODO(KIP-966): Update the ELR if the broker has an unclean 
shutdown.
-log.debug("Received an unclean shutdown request");
+if (handleBrokerUncleanShutdownHelper == null) {
+log.warn("No handleBrokerUncleanShutdownHelper provided");

Review Comment:
   Thanks for the advice! Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-02-14 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1489894070


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -780,4 +789,9 @@ public Entry> next() {
 }
 };
 }
+
+@FunctionalInterface
+interface HandleBrokerUncleanShutdownHelper {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-02-14 Thread via GitHub


mumrah commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1489685922


##
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.kafka.metadata.Replicas.NONE;
+
+public class BrokersToElrs {
+private final SnapshotRegistry snapshotRegistry;
+
+// It maps from the broker id to the topic id partitions if the partition 
has ELR.
+private final TimelineHashMap> 
elrMembers;
+
+BrokersToElrs(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+this.elrMembers = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+
+/**
+ * Update our records of a partition's ELR.
+ *
+ * @param topicId   The topic ID of the partition.
+ * @param partitionId   The partition ID of the partition.
+ * @param prevElr   The previous ELR, or null if the partition is new.
+ * @param nextElr   The new ELR, or null if the partition is being 
removed.
+ */
+
+void update(Uuid topicId, int partitionId, int[] prevElr, int[] nextElr) {
+int[] prev;
+if (prevElr == null) {
+prev = NONE;
+} else {
+prev = Replicas.clone(prevElr);
+Arrays.sort(prev);
+}
+int[] next;
+if (nextElr == null) {
+next = NONE;
+} else {
+next = Replicas.clone(nextElr);
+Arrays.sort(next);
+}
+
+int i = 0, j = 0;
+while (true) {
+if (i == prev.length) {
+if (j == next.length) {
+break;
+}
+int newReplica = next[j];
+add(newReplica, topicId, partitionId);
+j++;
+} else if (j == next.length) {
+int prevReplica = prev[i];
+remove(prevReplica, topicId, partitionId);
+i++;
+} else {
+int prevReplica = prev[i];
+int newReplica = next[j];
+if (prevReplica < newReplica) {
+remove(prevReplica, topicId, partitionId);
+i++;
+} else if (prevReplica > newReplica) {
+add(newReplica, topicId, partitionId);
+j++;
+} else {
+i++;
+j++;
+}
+}
+}
+}
+
+void removeTopicEntryForBroker(Uuid topicId, int brokerId) {
+Map topicMap = elrMembers.get(brokerId);
+if (topicMap != null) {
+topicMap.remove(topicId);
+}
+}
+
+private void add(int brokerId, Uuid topicId, int newPartition) {
+TimelineHashMap topicMap = elrMembers.get(brokerId);
+if (topicMap == null) {
+topicMap = new TimelineHashMap<>(snapshotRegistry, 0);
+elrMembers.put(brokerId, topicMap);
+}
+int[] partitions = topicMap.get(topicId);
+int[] newPartitions;
+if (partitions == null) {
+newPartitions = new int[1];
+} else {
+newPartitions = new int[partitions.length + 1];
+System.arraycopy(partitions, 0, newPartitions, 0, 
partitions.length);
+}
+newPartitions[newPartitions.length - 1] = newPartition;
+topicMap.put(topicId, newPartitions);
+}
+
+private void remove(int brokerId, Uuid topicId, int removedPartition) {
+TimelineHashMap topicMap = elrMembers.get(brokerId);
+if (topicMap == null) {
+throw new RuntimeException("Broker " + brokerId + " has no 
elrMembers " +
+"entry, so we can't remove " + topicId + ":" + 
removedPartition);
+}
+int[] partitions = topicMap.get(topicId);
+ 

Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-28 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1408490077


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -466,6 +466,10 @@ private void 
maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
 partition.lastKnownElr[0] != partition.leader)) {
 // Only update the last known leader when the first time the 
partition becomes leaderless.
 record.setLastKnownELR(Arrays.asList(partition.leader));
+} else if ((record.leader() > 0 || (partition.leader != NO_LEADER && 
record.leader() != NO_LEADER))

Review Comment:
   Done



##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -528,6 +536,10 @@ private void maybePopulateTargetElr() {
 .collect(Collectors.toList());
 }
 
+static boolean recordHasLeader(PartitionChangeRecord record) {

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-21 Thread via GitHub


artemlivshits commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1401454282


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -528,6 +536,10 @@ private void maybePopulateTargetElr() {
 .collect(Collectors.toList());
 }
 
+static boolean recordHasLeader(PartitionChangeRecord record) {

Review Comment:
   Do we use this now?



##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -466,6 +466,10 @@ private void 
maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
 partition.lastKnownElr[0] != partition.leader)) {
 // Only update the last known leader when the first time the 
partition becomes leaderless.
 record.setLastKnownELR(Arrays.asList(partition.leader));
+} else if ((record.leader() > 0 || (partition.leader != NO_LEADER && 
record.leader() != NO_LEADER))

Review Comment:
   0 is a valid leader, so probably should use `record.leader() >= 0`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1400042116


##
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;

Review Comment:
   Updated the comment in the ReplicationControlManager. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1400041537


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -327,10 +333,12 @@ public ControllerResult 
registerBroker(
 ", but got cluster ID " + request.clusterId());
 }
 int brokerId = request.brokerId();
+List records = new ArrayList<>();
 BrokerRegistration existing = brokerRegistrations.get(brokerId);
-if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {
-// TODO(KIP-966): Update the ELR if the broker has an unclean 
shutdown.
-log.debug("Received an unclean shutdown request");
+if (version >= 2
+&& (existing == null || request.previousBrokerEpoch() != 
existing.epoch())
+&& replicationControlManager != null) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1400041854


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1772,7 +1814,8 @@ void generateLeaderAndIsrUpdates(String context,
 // where there is an unclean leader election which chooses a leader 
from outside
 // the ISR.
 IntPredicate isAcceptableLeader =
-r -> (r != brokerToRemove) && (r == brokerToAdd || 
clusterControl.isActive(r));
+r -> (r != brokerToRemove && r != brokerWithUncleanShutdown)

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1400041780


##
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.kafka.metadata.Replicas.NONE;
+
+public class BrokersToElrs {
+private final SnapshotRegistry snapshotRegistry;
+
+private final TimelineHashMap> 
elrMembers;

Review Comment:
   Done. It is broker id -> TopicIdPartitions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1400036881


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -466,6 +466,10 @@ private void 
maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
 partition.lastKnownElr[0] != partition.leader)) {
 // Only update the last known leader when the first time the 
partition becomes leaderless.
 record.setLastKnownELR(Arrays.asList(partition.leader));
+} else if ((recordHasLeader(record) || partition.leader != NO_LEADER)

Review Comment:
   Good catch! To improve the readability, revise to the following.
   `record.leader() > 0 || (partition.leader != NO_LEADER && record.leader() != 
NO_LEADER)`
   `(Will have a new leader) || (will not become leaderless)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


artemlivshits commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1399798771


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -466,6 +466,10 @@ private void 
maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
 partition.lastKnownElr[0] != partition.leader)) {
 // Only update the last known leader when the first time the 
partition becomes leaderless.
 record.setLastKnownELR(Arrays.asList(partition.leader));
+} else if ((recordHasLeader(record) || partition.leader != NO_LEADER)

Review Comment:
   Can we have a case when `record.leader() == NO_LEADER  && partition.leader 
!= NO_LEADER` (i.e. partion has a valid leader but will not have a valid 
leader)?
   
   Would a more proper condition be `(recordHasLeader(record) || 
(partition.leader != NO_LEADER && record.leader() == NO_LEADER_CHANGE))`



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1772,7 +1814,8 @@ void generateLeaderAndIsrUpdates(String context,
 // where there is an unclean leader election which chooses a leader 
from outside
 // the ISR.
 IntPredicate isAcceptableLeader =
-r -> (r != brokerToRemove) && (r == brokerToAdd || 
clusterControl.isActive(r));
+r -> (r != brokerToRemove && r != brokerWithUncleanShutdown)

Review Comment:
   Can we update the comments to reflect the new logic?



##
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;

Review Comment:
   It looks like that in order to work correctly, we need to make sure that ISR 
and ELR are always disjoint sets (otherwise we'd generate multiple records for 
the same partition).  Can we add a comment about this?



##
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.kafka.metadata.Replicas.NONE;
+
+public class BrokersToElrs {
+private final SnapshotRegistry snapshotRegistry;
+
+private final TimelineHashMap> 
elrMembers;

Review Comment:
   Can we add a comment what's mapped to what here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1399579994


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -327,10 +333,12 @@ public ControllerResult 
registerBroker(
 ", but got cluster ID " + request.clusterId());
 }
 int brokerId = request.brokerId();
+List records = new ArrayList<>();
 BrokerRegistration existing = brokerRegistrations.get(brokerId);
-if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {
-// TODO(KIP-966): Update the ELR if the broker has an unclean 
shutdown.
-log.debug("Received an unclean shutdown request");

Review Comment:
   Actually, the server may receive old registration requests due to poor 
network conditions or during rolling upgrades. I think we should also treat the 
old version requests as unclean shutdowns.



##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -327,10 +333,12 @@ public ControllerResult 
registerBroker(
 ", but got cluster ID " + request.clusterId());
 }
 int brokerId = request.brokerId();
+List records = new ArrayList<>();
 BrokerRegistration existing = brokerRegistrations.get(brokerId);
-if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {
-// TODO(KIP-966): Update the ELR if the broker has an unclean 
shutdown.
-log.debug("Received an unclean shutdown request");

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1399575912


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1294,8 +1315,11 @@ void handleBrokerFenced(int brokerId, 
List records) {
  */
 void handleBrokerUnregistered(int brokerId, long brokerEpoch,
   List records) {
-generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, 
NO_LEADER, records,
-brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
+generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, 
NO_LEADER, NO_LEADER, records,
+new PartitionsOnReplicaIteratorChain(Arrays.asList(

Review Comment:
   Removed.



##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -466,6 +466,10 @@ private void 
maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
 partition.lastKnownElr[0] != partition.leader)) {
 // Only update the last known leader when the first time the 
partition becomes leaderless.
 record.setLastKnownELR(Arrays.asList(partition.leader));
+} else if ((record.leader() != NO_LEADER && record.leader() != 
NO_LEADER_CHANGE || partition.leader != NO_LEADER)

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


mumrah commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1399468190


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -327,10 +333,12 @@ public ControllerResult 
registerBroker(
 ", but got cluster ID " + request.clusterId());
 }
 int brokerId = request.brokerId();
+List records = new ArrayList<>();
 BrokerRegistration existing = brokerRegistrations.get(brokerId);
-if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {
-// TODO(KIP-966): Update the ELR if the broker has an unclean 
shutdown.
-log.debug("Received an unclean shutdown request");
+if (version >= 2
+&& (existing == null || request.previousBrokerEpoch() != 
existing.epoch())
+&& replicationControlManager != null) {

Review Comment:
   ClusterControlManager#Builder should enforce that dependencies are 
satisfied, not the runtime code



##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -466,6 +466,10 @@ private void 
maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
 partition.lastKnownElr[0] != partition.leader)) {
 // Only update the last known leader when the first time the 
partition becomes leaderless.
 record.setLastKnownELR(Arrays.asList(partition.leader));
+} else if ((record.leader() != NO_LEADER && record.leader() != 
NO_LEADER_CHANGE || partition.leader != NO_LEADER)

Review Comment:
   Can we make the first part of this into a static method 
`recordHasLeader(PartitionChangeRecord record)` ?



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1294,8 +1315,11 @@ void handleBrokerFenced(int brokerId, 
List records) {
  */
 void handleBrokerUnregistered(int brokerId, long brokerEpoch,
   List records) {
-generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, 
NO_LEADER, records,
-brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
+generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, 
NO_LEADER, NO_LEADER, records,
+new PartitionsOnReplicaIteratorChain(Arrays.asList(

Review Comment:
   If it's not needed for correctness, yes lets remove the chained iterator, 
it's a bit confusing.



##
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;

Review Comment:
   I guess the alternative would be to extend the BrokersToIsrs to include a 
ELR_FLAG similar to the LEADER_FLAG. I agree this would probably be complex to 
implement. If I recall from the KIP, the ELR is normally empty, so maybe having 
this parallel data structure isn't so bad (since it will normally not be 
populated)?
   
   @cmccabe since you worked on `BrokersToIsrs` originally, do you have any 
opinion here?



##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -327,10 +333,12 @@ public ControllerResult 
registerBroker(
 ", but got cluster ID " + request.clusterId());
 }
 int brokerId = request.brokerId();
+List records = new ArrayList<>();
 BrokerRegistration existing = brokerRegistrations.get(brokerId);
-if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {
-// TODO(KIP-966): Update the ELR if the broker has an unclean 
shutdown.
-log.debug("Received an unclean shutdown request");

Review Comment:
   For older BrokerRegistration versions, don't we still want to log if there 
was an unclean shutdown request?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:

Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-06 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1384373047


##
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;

Review Comment:
   To help review this file, you can refer to the BrokersToIsrs.java
   The reasons not to modify the BrokersToIsrs are:
   1. The BrokersToIsrs is complicated, hard to extend.
   2. You may think that we can simply let the BrokersToIsrs treat the ISR and 
ELR the same, storing the ISR and ELR together. However, the ISR and ELR do not 
behave exactly the same(fence a broker will only remove it from ISRs). 



##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -466,6 +466,9 @@ private void 
maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
 partition.lastKnownElr[0] != partition.leader)) {
 // Only update the last known leader when the first time the 
partition becomes leaderless.
 record.setLastKnownELR(Arrays.asList(partition.leader));
+} else if (record.leader() != NO_LEADER && record.leader() != 
NO_LEADER_CHANGE && partition.leader == NO_LEADER) {

Review Comment:
   Refactor the last known leader handling a bit.



##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -490,6 +493,10 @@ private void maybeUpdateRecordElr(PartitionChangeRecord 
record) {
 targetLastKnownElr = Replicas.toList(partition.lastKnownElr);
 }
 
+// If the last known ELR is expected to store the last known leader, 
the lastKnownElr field should be updated
+// later in maybeUpdateLastKnownLeader.
+if (useLastKnownLeaderInBalancedRecovery) return;

Review Comment:
   Refactor the last known leader handling a bit.



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1294,8 +1315,11 @@ void handleBrokerFenced(int brokerId, 
List records) {
  */
 void handleBrokerUnregistered(int brokerId, long brokerEpoch,
   List records) {
-generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, 
NO_LEADER, records,
-brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
+generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, 
NO_LEADER, NO_LEADER, records,
+new PartitionsOnReplicaIteratorChain(Arrays.asList(

Review Comment:
   We can avoid using the iterator chain by calling generateLeaderAndIsrUpdates 
twice. I am open to both ways.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org