hachikuji commented on a change in pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#discussion_r685421603



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -858,7 +858,7 @@ private void rescheduleMaybeFenceStaleBrokers() {
             return;
         }
         scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs, () 
-> {
-            ControllerResult<Void> result = 
replicationControl.maybeFenceStaleBrokers();
+            ControllerResult<Void> result = 
replicationControl.maybeFenceOneStaleBroker();
             rescheduleMaybeFenceStaleBrokers();

Review comment:
       One thing I was considering is whether we should schedule the next check 
right away. For example, maybe we could check whether `result` contains a 
non-empty record set and reschedule immediately (or perhaps after a short 
backoff) if it does.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -925,14 +926,21 @@ ApiError electLeader(String topic, int partitionId, 
boolean uncleanOk,
         return ControllerResult.of(records, null);
     }
 
-    ControllerResult<Void> maybeFenceStaleBrokers() {
+    ControllerResult<Void> maybeFenceOneStaleBroker() {
         List<ApiMessageAndVersion> records = new ArrayList<>();
         BrokerHeartbeatManager heartbeatManager = 
clusterControl.heartbeatManager();
         List<Integer> staleBrokers = heartbeatManager.findStaleBrokers();
-        for (int brokerId : staleBrokers) {
+        if (!staleBrokers.isEmpty()) {
+            // Even though multiple brokers can go stale at a time, we will 
process
+            // fencing one at a time so that the effect of fencing each broker 
is visible
+            // to the controller (memory) prior to processing the fencing of 
the next one
+            int brokerId = staleBrokers.get(0);
+            log.debug("Found broker(s) {} to be stale. Processing them one at 
a time",
+                Arrays.toString(staleBrokers.toArray()));
             log.info("Fencing broker {} because its session has timed out.", 
brokerId);
             handleBrokerFenced(brokerId, records);
             heartbeatManager.fence(brokerId);
+

Review comment:
       nit: remove newline

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -925,14 +926,21 @@ ApiError electLeader(String topic, int partitionId, 
boolean uncleanOk,
         return ControllerResult.of(records, null);
     }
 
-    ControllerResult<Void> maybeFenceStaleBrokers() {
+    ControllerResult<Void> maybeFenceOneStaleBroker() {
         List<ApiMessageAndVersion> records = new ArrayList<>();
         BrokerHeartbeatManager heartbeatManager = 
clusterControl.heartbeatManager();
         List<Integer> staleBrokers = heartbeatManager.findStaleBrokers();
-        for (int brokerId : staleBrokers) {
+        if (!staleBrokers.isEmpty()) {

Review comment:
       If we are only expiring one, maybe we can change `findStaleBrokers` to 
`findStaleBroker` and return `Optional<Integer>` instead of `List<Integer>`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to