cmccabe commented on code in PR #17502: URL: https://github.com/apache/kafka/pull/17502#discussion_r1813273750
########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -1597,18 +1597,34 @@ public ControllerResult<Void> unregisterBroker(int brokerId) { return ControllerResult.of(records, null); } - ControllerResult<Void> maybeFenceOneStaleBroker() { - List<ApiMessageAndVersion> records = new ArrayList<>(); + ControllerResult<Boolean> maybeFenceOneStaleBroker() { BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager(); - heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> { - // Even though multiple brokers can go stale at a time, we will process - // fencing one at a time so that the effect of fencing each broker is visible - // to the system prior to processing the next one - log.info("Fencing broker {} because its session has timed out.", brokerId); - handleBrokerFenced(brokerId, records); - heartbeatManager.fence(brokerId); - }); - return ControllerResult.of(records, null); + Optional<BrokerIdAndEpoch> idAndEpoch = heartbeatManager.tracker().maybeRemoveExpired(); Review Comment: yes, they remain until they expire. we always check what epoch each entry has, so ignoring the old ones is easy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org