mumrah commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828108377



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -147,6 +147,7 @@
         private int defaultNumPartitions = 1;
         private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new 
Random());
         private long snapshotMaxNewRecordBytes = Long.MAX_VALUE;
+        private long leaderImbalanceCheckIntervalNs = -1;

Review comment:
       We use `Optional` for a few other things in QuroumController, maybe we 
can use OptionalLong here instead of -1?

##########
File path: core/src/main/scala/kafka/server/ControllerServer.scala
##########
@@ -158,20 +158,30 @@ class ControllerServer(
       alterConfigPolicy = Option(config.
         getConfiguredInstance(AlterConfigPolicyClassNameProp, 
classOf[AlterConfigPolicy]))
 
-      val controllerBuilder = new QuorumController.Builder(config.nodeId, 
metaProperties.clusterId).
-        setTime(time).
-        setThreadNamePrefix(threadNamePrefixAsString).
-        setConfigSchema(configSchema).
-        setRaftClient(raftManager.client).
-        setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
-        setDefaultNumPartitions(config.numPartitions.intValue()).
-        
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
-          TimeUnit.MILLISECONDS)).
-        setSnapshotMaxNewRecordBytes(config.metadataSnapshotMaxNewRecordBytes).
-        setMetrics(new 
QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry())).
-        setCreateTopicPolicy(createTopicPolicy.asJava).
-        setAlterConfigPolicy(alterConfigPolicy.asJava).
-        setConfigurationValidator(new ControllerConfigurationValidator())
+      val controllerBuilder = {
+        val leaderImbalanceCheckIntervalNs = if 
(config.autoLeaderRebalanceEnable) {

Review comment:
       We should probably guard against a user setting this to zero. Otherwise, 
we'll thrash on the event queue and maybe even have a live-lock.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -953,6 +971,48 @@ private void cancelMaybeFenceReplicas() {
         queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
     }
 
+    private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
+
+    private void maybeScheduleNextBalancePartitionLeaders() {
+        final String maybeBalancePartitionLeaders = 
"maybeBalancePartitionLeaders";
+
+        if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED &&
+            leaderImbalanceCheckIntervalNs >= 0 &&
+            replicationControl.arePartitionLeadersImbalanced()) {
+
+            log.debug(
+                "Scheduling deferred event for {} because scheduled ({}), 
checkIntervalNs ({}) and isImbalanced ({})",
+                maybeBalancePartitionLeaders,
+                imbalancedScheduled,
+                leaderImbalanceCheckIntervalNs,
+                replicationControl.arePartitionLeadersImbalanced()
+            );
+            long delayNs = time.nanoseconds();
+            if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) {
+                delayNs += leaderImbalanceCheckIntervalNs;
+            }
+            scheduleDeferredWriteEvent(maybeBalancePartitionLeaders, delayNs, 
() -> {

Review comment:
       Looking down in the event code, I see this comment when we are handling 
deferred events.
   
   ```
                                   // The deferred event is ready to run.  
Prepend it to the
                                   // queue.  (The value for deferred events is 
a schedule time
                                   // rather than a timeout.)
   
   ```
   
   In the case of the IMMEDIATE state where we set deadline to the current 
time, this might mean that it gets prepended to the queue and run before other 
waiting write events. Is this what we want here? Maybe we should schedule a 
regular write event for the IMMEDIATE case.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1197,6 +1257,25 @@ private void resetState() {
      */
     private long newBytesSinceLastSnapshot = 0;
 
+    /**
+     * How long to delay partition leader balancing operations.
+     */
+    private final long leaderImbalanceCheckIntervalNs;
+
+    private static enum ImbalanceSchedule {
+        // Leader balancing has been scheduled
+        SCHEDULED,
+        // Leader balancing should be scheduled in the future
+        DEFERRED,

Review comment:
       If I understand correctly, DEFERRED is like our "idle" state here. That 
is, if no automatic preferred leader election is happening, we'll be in the 
DEFERRED state until an imbalance occurs. Is that right?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1197,6 +1257,25 @@ private void resetState() {
      */
     private long newBytesSinceLastSnapshot = 0;
 
+    /**
+     * How long to delay partition leader balancing operations.
+     */
+    private final long leaderImbalanceCheckIntervalNs;
+
+    private static enum ImbalanceSchedule {
+        // Leader balancing has been scheduled
+        SCHEDULED,
+        // Leader balancing should be scheduled in the future
+        DEFERRED,
+        // Leader balancing should be scheduled immediately
+        IMMIDIATELY

Review comment:
       typo: IMMEDIATELY

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -953,6 +971,48 @@ private void cancelMaybeFenceReplicas() {
         queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
     }
 
+    private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
+
+    private void maybeScheduleNextBalancePartitionLeaders() {
+        final String maybeBalancePartitionLeaders = 
"maybeBalancePartitionLeaders";
+
+        if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED &&
+            leaderImbalanceCheckIntervalNs >= 0 &&
+            replicationControl.arePartitionLeadersImbalanced()) {
+
+            log.debug(
+                "Scheduling deferred event for {} because scheduled ({}), 
checkIntervalNs ({}) and isImbalanced ({})",

Review comment:
       If we are in IMMEDIATE we'll just be submitting an un-deferred write 
event right?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -861,6 +875,10 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
                     // required because the active controller assumes that 
there is always an in-memory snapshot at the
                     // last committed offset.
                     snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
+
+                    // When becoming the active controller, schedule a leader 
rebalance if there are any topic partition
+                    // with leader that is not the preferred leader.
+                    maybeScheduleNextBalancePartitionLeaders();

Review comment:
       Related question -- what happens to deferred write events when we 
renounce leadership? Do we drain the queue first?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to