mumrah commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r828108377
########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -147,6 +147,7 @@ private int defaultNumPartitions = 1; private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random()); private long snapshotMaxNewRecordBytes = Long.MAX_VALUE; + private long leaderImbalanceCheckIntervalNs = -1; Review comment: We use `Optional` for a few other things in QuroumController, maybe we can use OptionalLong here instead of -1? ########## File path: core/src/main/scala/kafka/server/ControllerServer.scala ########## @@ -158,20 +158,30 @@ class ControllerServer( alterConfigPolicy = Option(config. getConfiguredInstance(AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy])) - val controllerBuilder = new QuorumController.Builder(config.nodeId, metaProperties.clusterId). - setTime(time). - setThreadNamePrefix(threadNamePrefixAsString). - setConfigSchema(configSchema). - setRaftClient(raftManager.client). - setDefaultReplicationFactor(config.defaultReplicationFactor.toShort). - setDefaultNumPartitions(config.numPartitions.intValue()). - setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(), - TimeUnit.MILLISECONDS)). - setSnapshotMaxNewRecordBytes(config.metadataSnapshotMaxNewRecordBytes). - setMetrics(new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry())). - setCreateTopicPolicy(createTopicPolicy.asJava). - setAlterConfigPolicy(alterConfigPolicy.asJava). - setConfigurationValidator(new ControllerConfigurationValidator()) + val controllerBuilder = { + val leaderImbalanceCheckIntervalNs = if (config.autoLeaderRebalanceEnable) { Review comment: We should probably guard against a user setting this to zero. Otherwise, we'll thrash on the event queue and maybe even have a live-lock. ########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -953,6 +971,48 @@ private void cancelMaybeFenceReplicas() { queue.cancelDeferred(MAYBE_FENCE_REPLICAS); } + private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000; + + private void maybeScheduleNextBalancePartitionLeaders() { + final String maybeBalancePartitionLeaders = "maybeBalancePartitionLeaders"; + + if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED && + leaderImbalanceCheckIntervalNs >= 0 && + replicationControl.arePartitionLeadersImbalanced()) { + + log.debug( + "Scheduling deferred event for {} because scheduled ({}), checkIntervalNs ({}) and isImbalanced ({})", + maybeBalancePartitionLeaders, + imbalancedScheduled, + leaderImbalanceCheckIntervalNs, + replicationControl.arePartitionLeadersImbalanced() + ); + long delayNs = time.nanoseconds(); + if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) { + delayNs += leaderImbalanceCheckIntervalNs; + } + scheduleDeferredWriteEvent(maybeBalancePartitionLeaders, delayNs, () -> { Review comment: Looking down in the event code, I see this comment when we are handling deferred events. ``` // The deferred event is ready to run. Prepend it to the // queue. (The value for deferred events is a schedule time // rather than a timeout.) ``` In the case of the IMMEDIATE state where we set deadline to the current time, this might mean that it gets prepended to the queue and run before other waiting write events. Is this what we want here? Maybe we should schedule a regular write event for the IMMEDIATE case. ########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -1197,6 +1257,25 @@ private void resetState() { */ private long newBytesSinceLastSnapshot = 0; + /** + * How long to delay partition leader balancing operations. + */ + private final long leaderImbalanceCheckIntervalNs; + + private static enum ImbalanceSchedule { + // Leader balancing has been scheduled + SCHEDULED, + // Leader balancing should be scheduled in the future + DEFERRED, Review comment: If I understand correctly, DEFERRED is like our "idle" state here. That is, if no automatic preferred leader election is happening, we'll be in the DEFERRED state until an imbalance occurs. Is that right? ########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -1197,6 +1257,25 @@ private void resetState() { */ private long newBytesSinceLastSnapshot = 0; + /** + * How long to delay partition leader balancing operations. + */ + private final long leaderImbalanceCheckIntervalNs; + + private static enum ImbalanceSchedule { + // Leader balancing has been scheduled + SCHEDULED, + // Leader balancing should be scheduled in the future + DEFERRED, + // Leader balancing should be scheduled immediately + IMMIDIATELY Review comment: typo: IMMEDIATELY ########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -953,6 +971,48 @@ private void cancelMaybeFenceReplicas() { queue.cancelDeferred(MAYBE_FENCE_REPLICAS); } + private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000; + + private void maybeScheduleNextBalancePartitionLeaders() { + final String maybeBalancePartitionLeaders = "maybeBalancePartitionLeaders"; + + if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED && + leaderImbalanceCheckIntervalNs >= 0 && + replicationControl.arePartitionLeadersImbalanced()) { + + log.debug( + "Scheduling deferred event for {} because scheduled ({}), checkIntervalNs ({}) and isImbalanced ({})", Review comment: If we are in IMMEDIATE we'll just be submitting an un-deferred write event right? ########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -861,6 +875,10 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) { // required because the active controller assumes that there is always an in-memory snapshot at the // last committed offset. snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); + + // When becoming the active controller, schedule a leader rebalance if there are any topic partition + // with leader that is not the preferred leader. + maybeScheduleNextBalancePartitionLeaders(); Review comment: Related question -- what happens to deferred write events when we renounce leadership? Do we drain the queue first? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org