mumrah commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1399468190
##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -327,10 +333,12 @@ public ControllerResult<BrokerRegistrationReply>
registerBroker(
", but got cluster ID " + request.clusterId());
}
int brokerId = request.brokerId();
+ List<ApiMessageAndVersion> records = new ArrayList<>();
BrokerRegistration existing = brokerRegistrations.get(brokerId);
- if (version < 2 || existing == null || request.previousBrokerEpoch()
!= existing.epoch()) {
- // TODO(KIP-966): Update the ELR if the broker has an unclean
shutdown.
- log.debug("Received an unclean shutdown request");
+ if (version >= 2
+ && (existing == null || request.previousBrokerEpoch() !=
existing.epoch())
+ && replicationControlManager != null) {
Review Comment:
ClusterControlManager#Builder should enforce that dependencies are
satisfied, not the runtime code
##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -466,6 +466,10 @@ private void
maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
partition.lastKnownElr[0] != partition.leader)) {
// Only update the last known leader when the first time the
partition becomes leaderless.
record.setLastKnownELR(Arrays.asList(partition.leader));
+ } else if ((record.leader() != NO_LEADER && record.leader() !=
NO_LEADER_CHANGE || partition.leader != NO_LEADER)
Review Comment:
Can we make the first part of this into a static method
`recordHasLeader(PartitionChangeRecord record)` ?
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1294,8 +1315,11 @@ void handleBrokerFenced(int brokerId,
List<ApiMessageAndVersion> records) {
*/
void handleBrokerUnregistered(int brokerId, long brokerEpoch,
List<ApiMessageAndVersion> records) {
- generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId,
NO_LEADER, records,
- brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
+ generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId,
NO_LEADER, NO_LEADER, records,
+ new PartitionsOnReplicaIteratorChain(Arrays.asList(
Review Comment:
If it's not needed for correctness, yes lets remove the chained iterator,
it's a bit confusing.
##########
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
Review Comment:
I guess the alternative would be to extend the BrokersToIsrs to include a
ELR_FLAG similar to the LEADER_FLAG. I agree this would probably be complex to
implement. If I recall from the KIP, the ELR is normally empty, so maybe having
this parallel data structure isn't so bad (since it will normally not be
populated)?
@cmccabe since you worked on `BrokersToIsrs` originally, do you have any
opinion here?
##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -327,10 +333,12 @@ public ControllerResult<BrokerRegistrationReply>
registerBroker(
", but got cluster ID " + request.clusterId());
}
int brokerId = request.brokerId();
+ List<ApiMessageAndVersion> records = new ArrayList<>();
BrokerRegistration existing = brokerRegistrations.get(brokerId);
- if (version < 2 || existing == null || request.previousBrokerEpoch()
!= existing.epoch()) {
- // TODO(KIP-966): Update the ELR if the broker has an unclean
shutdown.
- log.debug("Received an unclean shutdown request");
Review Comment:
For older BrokerRegistration versions, don't we still want to log if there
was an unclean shutdown request?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]