junrao commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r827394335



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1197,6 +1257,25 @@ private void resetState() {
      */
     private long newBytesSinceLastSnapshot = 0;
 
+    /**
+     * How long to delay partition leader balancing operations.
+     */
+    private final long leaderImbalanceCheckIntervalNs;
+
+    private static enum ImbalanceSchedule {

Review comment:
       static seems redundant for inner enum?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -147,6 +147,7 @@
         private int defaultNumPartitions = 1;
         private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new 
Random());
         private long snapshotMaxNewRecordBytes = Long.MAX_VALUE;
+        private long leaderImbalanceCheckIntervalNs = -1;

Review comment:
       -1L?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -162,14 +164,14 @@ public Uuid topicId() {
     private final int defaultNumPartitions;
 
     /**
-     * A count of the total number of partitions in the cluster.
+     * Maximum number of leader elections to perform during one partion leader 
balancing operation.

Review comment:
       typo partion

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -861,6 +875,10 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
                     // required because the active controller assumes that 
there is always an in-memory snapshot at the
                     // last committed offset.
                     snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
+
+                    // When becoming the active controller, schedule a leader 
rebalance if there are any topic partition
+                    // with leader that is not the preferred leader.
+                    maybeScheduleNextBalancePartitionLeaders();

Review comment:
       Should we reset imbalancedScheduled here or on renounce()?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -953,6 +971,48 @@ private void cancelMaybeFenceReplicas() {
         queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
     }
 
+    private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
+
+    private void maybeScheduleNextBalancePartitionLeaders() {
+        final String maybeBalancePartitionLeaders = 
"maybeBalancePartitionLeaders";
+
+        if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED &&
+            leaderImbalanceCheckIntervalNs >= 0 &&
+            replicationControl.arePartitionLeadersImbalanced()) {
+
+            log.debug(
+                "Scheduling deferred event for {} because scheduled ({}), 
checkIntervalNs ({}) and isImbalanced ({})",
+                maybeBalancePartitionLeaders,
+                imbalancedScheduled,
+                leaderImbalanceCheckIntervalNs,
+                replicationControl.arePartitionLeadersImbalanced()
+            );
+            long delayNs = time.nanoseconds();
+            if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) {
+                delayNs += leaderImbalanceCheckIntervalNs;
+            }
+            scheduleDeferredWriteEvent(maybeBalancePartitionLeaders, delayNs, 
() -> {
+                ControllerResult<Boolean> result = 
replicationControl.maybeBalancePartitionLeaders();
+
+                // rechedule the operation after the 
leaderImbalanceCheckIntervalNs interval.

Review comment:
       typo rechedule

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1016,6 +1042,55 @@ ApiError electLeader(String topic, int partitionId, 
ElectionType electionType,
         return ControllerResult.of(records, null);
     }
 
+    boolean arePartitionLeadersImbalanced() {
+        return !imbalancedPartitions.isEmpty();
+    }
+
+    /**
+     * Attempt to elect a preferred leader for all topic partitions that a 
leader that is not the preferred replica.
+     *
+     * The response() method in the return object is true if this method 
returned without electing all possible preferred replicas.
+     * The quorum controlller should reschedule this operation immediately if 
it is true.
+     *
+     * @return All of the election records and if there may be more available 
preferred replicas to elect as leader
+     */
+    ControllerResult<Boolean> maybeBalancePartitionLeaders() {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+
+        boolean rescheduleImmidiately = false;
+        for (TopicIdPartition topicPartition : imbalancedPartitions) {
+            if (records.size() >= maxElectionsPerImbalance) {
+                rescheduleImmidiately = true;
+                break;
+            }
+
+            TopicControlInfo topic = topics.get(topicPartition.topicId());
+            if (topic == null) {
+                log.error("Skipping unknown imbalanced topic {}", 
topicPartition);
+                continue;
+            }
+
+            PartitionRegistration partition = 
topic.parts.get(topicPartition.partitionId());
+            if (partition == null) {
+                log.error("Skipping unknown imbalanced partition {}", 
topicPartition);
+                continue;
+            }
+
+            // Attempt to perform a preferred leader election
+            PartitionChangeBuilder builder = new PartitionChangeBuilder(
+                partition,
+                topicPartition.topicId(),
+                topicPartition.partitionId(),
+                r -> clusterControl.unfenced(r),
+                () -> false
+            );
+            builder.setAlwaysElectPreferredIfPossible(true);
+            builder.build().ifPresent(records::add);

Review comment:
       PartitionChangeBuilder seems to have an existing issue with preferred 
leader election. Suppose that a partition has replicas as {1,2,3}, isr as {2,3} 
and leader as 3. Calling builder.build() on this partition seems to change the 
leader to 2. This is unnecessary since 2 is not the preferred leader.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1016,6 +1042,55 @@ ApiError electLeader(String topic, int partitionId, 
ElectionType electionType,
         return ControllerResult.of(records, null);
     }
 
+    boolean arePartitionLeadersImbalanced() {
+        return !imbalancedPartitions.isEmpty();
+    }
+
+    /**
+     * Attempt to elect a preferred leader for all topic partitions that a 
leader that is not the preferred replica.

Review comment:
       The sentence doesn't read well with two "that".

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -673,10 +679,19 @@ public void run() throws Exception {
                     replay(message.message(), Optional.empty(), offset);
                 }
                 snapshotRegistry.getOrCreateSnapshot(offset);
+
                 log.debug("Read-write operation {} will be completed when the 
log " +
                     "reaches offset {}.", this, resultAndOffset.offset());
             }
-            purgatory.add(resultAndOffset.offset(), this);
+
+            // After every controller write event, schedule a leader rebalance 
if there are any topic partition
+            // with leader that is not the preferred leader.
+            maybeScheduleNextBalancePartitionLeaders();
+
+            // Remember the latest offset and future if it is not already 
completed
+            if (!future.isDone()) {

Review comment:
       This check seems unnecessary since the future is only completed after 
it's in the purgatory?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to