junrao commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r827394335
########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -1197,6 +1257,25 @@ private void resetState() { */ private long newBytesSinceLastSnapshot = 0; + /** + * How long to delay partition leader balancing operations. + */ + private final long leaderImbalanceCheckIntervalNs; + + private static enum ImbalanceSchedule { Review comment: static seems redundant for inner enum? ########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -147,6 +147,7 @@ private int defaultNumPartitions = 1; private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random()); private long snapshotMaxNewRecordBytes = Long.MAX_VALUE; + private long leaderImbalanceCheckIntervalNs = -1; Review comment: -1L? ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -162,14 +164,14 @@ public Uuid topicId() { private final int defaultNumPartitions; /** - * A count of the total number of partitions in the cluster. + * Maximum number of leader elections to perform during one partion leader balancing operation. Review comment: typo partion ########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -861,6 +875,10 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) { // required because the active controller assumes that there is always an in-memory snapshot at the // last committed offset. snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); + + // When becoming the active controller, schedule a leader rebalance if there are any topic partition + // with leader that is not the preferred leader. + maybeScheduleNextBalancePartitionLeaders(); Review comment: Should we reset imbalancedScheduled here or on renounce()? ########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -953,6 +971,48 @@ private void cancelMaybeFenceReplicas() { queue.cancelDeferred(MAYBE_FENCE_REPLICAS); } + private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000; + + private void maybeScheduleNextBalancePartitionLeaders() { + final String maybeBalancePartitionLeaders = "maybeBalancePartitionLeaders"; + + if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED && + leaderImbalanceCheckIntervalNs >= 0 && + replicationControl.arePartitionLeadersImbalanced()) { + + log.debug( + "Scheduling deferred event for {} because scheduled ({}), checkIntervalNs ({}) and isImbalanced ({})", + maybeBalancePartitionLeaders, + imbalancedScheduled, + leaderImbalanceCheckIntervalNs, + replicationControl.arePartitionLeadersImbalanced() + ); + long delayNs = time.nanoseconds(); + if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) { + delayNs += leaderImbalanceCheckIntervalNs; + } + scheduleDeferredWriteEvent(maybeBalancePartitionLeaders, delayNs, () -> { + ControllerResult<Boolean> result = replicationControl.maybeBalancePartitionLeaders(); + + // rechedule the operation after the leaderImbalanceCheckIntervalNs interval. Review comment: typo rechedule ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -1016,6 +1042,55 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType, return ControllerResult.of(records, null); } + boolean arePartitionLeadersImbalanced() { + return !imbalancedPartitions.isEmpty(); + } + + /** + * Attempt to elect a preferred leader for all topic partitions that a leader that is not the preferred replica. + * + * The response() method in the return object is true if this method returned without electing all possible preferred replicas. + * The quorum controlller should reschedule this operation immediately if it is true. + * + * @return All of the election records and if there may be more available preferred replicas to elect as leader + */ + ControllerResult<Boolean> maybeBalancePartitionLeaders() { + List<ApiMessageAndVersion> records = new ArrayList<>(); + + boolean rescheduleImmidiately = false; + for (TopicIdPartition topicPartition : imbalancedPartitions) { + if (records.size() >= maxElectionsPerImbalance) { + rescheduleImmidiately = true; + break; + } + + TopicControlInfo topic = topics.get(topicPartition.topicId()); + if (topic == null) { + log.error("Skipping unknown imbalanced topic {}", topicPartition); + continue; + } + + PartitionRegistration partition = topic.parts.get(topicPartition.partitionId()); + if (partition == null) { + log.error("Skipping unknown imbalanced partition {}", topicPartition); + continue; + } + + // Attempt to perform a preferred leader election + PartitionChangeBuilder builder = new PartitionChangeBuilder( + partition, + topicPartition.topicId(), + topicPartition.partitionId(), + r -> clusterControl.unfenced(r), + () -> false + ); + builder.setAlwaysElectPreferredIfPossible(true); + builder.build().ifPresent(records::add); Review comment: PartitionChangeBuilder seems to have an existing issue with preferred leader election. Suppose that a partition has replicas as {1,2,3}, isr as {2,3} and leader as 3. Calling builder.build() on this partition seems to change the leader to 2. This is unnecessary since 2 is not the preferred leader. ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -1016,6 +1042,55 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType, return ControllerResult.of(records, null); } + boolean arePartitionLeadersImbalanced() { + return !imbalancedPartitions.isEmpty(); + } + + /** + * Attempt to elect a preferred leader for all topic partitions that a leader that is not the preferred replica. Review comment: The sentence doesn't read well with two "that". ########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -673,10 +679,19 @@ public void run() throws Exception { replay(message.message(), Optional.empty(), offset); } snapshotRegistry.getOrCreateSnapshot(offset); + log.debug("Read-write operation {} will be completed when the log " + "reaches offset {}.", this, resultAndOffset.offset()); } - purgatory.add(resultAndOffset.offset(), this); + + // After every controller write event, schedule a leader rebalance if there are any topic partition + // with leader that is not the preferred leader. + maybeScheduleNextBalancePartitionLeaders(); + + // Remember the latest offset and future if it is not already completed + if (!future.isDone()) { Review comment: This check seems unnecessary since the future is only completed after it's in the purgatory? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org