cmccabe commented on code in PR #17502: URL: https://github.com/apache/kafka/pull/17502#discussion_r1811528276
########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -1159,255 +1176,12 @@ void renounce() { newWrongControllerException(OptionalInt.empty())); offsetControl.deactivate(); clusterControl.deactivate(); - cancelMaybeFenceReplicas(); - cancelMaybeBalancePartitionLeaders(); - cancelMaybeNextElectUncleanLeaders(); - cancelNextWriteNoOpRecord(); + periodicControl.deactivate(); } catch (Throwable e) { fatalFaultHandler.handleFault("exception while renouncing leadership", e); } } - private <T> void scheduleDeferredWriteEvent( - String name, - long deadlineNs, - ControllerWriteOperation<T> op, - EnumSet<ControllerOperationFlag> flags - ) { - if (!flags.contains(DOES_NOT_UPDATE_QUEUE_TIME)) { - throw new RuntimeException("deferred events should not update the queue time."); - } - ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op, flags); - queue.scheduleDeferred(name, new EarliestDeadlineFunction(deadlineNs), event); - event.future.exceptionally(e -> { - if (ControllerExceptions.isTimeoutException(e)) { - log.error("Cancelling deferred write event {} because the event queue " + - "is now closed.", name); - return null; - } else if (e instanceof NotControllerException) { - log.debug("Cancelling deferred write event {} because this controller " + - "is no longer active.", name); - return null; - } - log.error("Unexpected exception while executing deferred write event {}. " + - "Rescheduling for a minute from now.", name, e); - scheduleDeferredWriteEvent(name, - deadlineNs + NANOSECONDS.convert(1, TimeUnit.MINUTES), op, flags); - return null; - }); - } - - static final String MAYBE_FENCE_REPLICAS = "maybeFenceReplicas"; - - private void rescheduleMaybeFenceStaleBrokers() { - long nextCheckTimeNs = clusterControl.heartbeatManager().nextCheckTimeNs(); - if (nextCheckTimeNs == Long.MAX_VALUE) { - cancelMaybeFenceReplicas(); - return; - } - scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs, - () -> { - ControllerResult<Void> result = replicationControl.maybeFenceOneStaleBroker(); - // This following call ensures that if there are multiple brokers that - // are currently stale, then fencing for them is scheduled immediately - rescheduleMaybeFenceStaleBrokers(); - return result; - }, - EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME)); - } Review Comment: You are right that this will be slightly less accurate than the old way (assuming a non-congested controller event queue -- with congestion both ways have error). I added this comment to `maybeFenceStaleBrokerPeriodNs`: ``` * We sample 8 times per broker timeout period, so we'll generally fence a broker in no more * than 112.5% of the given broker session timeout. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org