cmccabe commented on a change in pull request #10564: URL: https://github.com/apache/kafka/pull/10564#discussion_r617507396
########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -1119,6 +1053,62 @@ void validateManualPartitionAssignment(List<Integer> assignment, } } + void generateLeaderAndIsrUpdates(String context, + int brokerToRemoveFromIsr, + List<ApiMessageAndVersion> records, + Iterator<TopicIdPartition> iterator) { + int oldSize = records.size(); + while (iterator.hasNext()) { + TopicIdPartition topicIdPart = iterator.next(); + TopicControlInfo topic = topics.get(topicIdPart.topicId()); + if (topic == null) { + throw new RuntimeException("Topic ID " + topicIdPart.topicId() + " existed in " + + "isrMembers, but not in the topics map."); + } + PartitionControlInfo partition = topic.parts.get(topicIdPart.partitionId()); + if (partition == null) { + throw new RuntimeException("Partition " + topicIdPart + + " existed in isrMembers, but not in the partitions map."); + } + int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemoveFromIsr); + int newLeader = Replicas.contains(newIsr, partition.leader) ? partition.leader : + bestLeader(partition.replicas, newIsr, false); + boolean unclean = newLeader != NO_LEADER && !Replicas.contains(newIsr, newLeader); + if (unclean) { + // After an unclean leader election, the ISR is reset to just the new leader. + newIsr = new int[] {newLeader}; + } else if (newIsr.length == 0) { + // We never want to shrink the ISR to size 0. Review comment: I agree that if `newIsr.length` is 0 on line 1080, then `newLeader` must be `NO_LEADER`. However, it's not necessary to add any special logic for this since `bestLeader` already handles this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org