cmccabe commented on code in PR #17502:
URL: https://github.com/apache/kafka/pull/17502#discussion_r1811528276


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1159,255 +1176,12 @@ void renounce() {
                     newWrongControllerException(OptionalInt.empty()));
             offsetControl.deactivate();
             clusterControl.deactivate();
-            cancelMaybeFenceReplicas();
-            cancelMaybeBalancePartitionLeaders();
-            cancelMaybeNextElectUncleanLeaders();
-            cancelNextWriteNoOpRecord();
+            periodicControl.deactivate();
         } catch (Throwable e) {
             fatalFaultHandler.handleFault("exception while renouncing 
leadership", e);
         }
     }
 
-    private <T> void scheduleDeferredWriteEvent(
-        String name,
-        long deadlineNs,
-        ControllerWriteOperation<T> op,
-        EnumSet<ControllerOperationFlag> flags
-    ) {
-        if (!flags.contains(DOES_NOT_UPDATE_QUEUE_TIME)) {
-            throw new RuntimeException("deferred events should not update the 
queue time.");
-        }
-        ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op, 
flags);
-        queue.scheduleDeferred(name, new EarliestDeadlineFunction(deadlineNs), 
event);
-        event.future.exceptionally(e -> {
-            if (ControllerExceptions.isTimeoutException(e)) {
-                log.error("Cancelling deferred write event {} because the 
event queue " +
-                    "is now closed.", name);
-                return null;
-            } else if (e instanceof NotControllerException) {
-                log.debug("Cancelling deferred write event {} because this 
controller " +
-                    "is no longer active.", name);
-                return null;
-            }
-            log.error("Unexpected exception while executing deferred write 
event {}. " +
-                "Rescheduling for a minute from now.", name, e);
-            scheduleDeferredWriteEvent(name,
-                deadlineNs + NANOSECONDS.convert(1, TimeUnit.MINUTES), op, 
flags);
-            return null;
-        });
-    }
-
-    static final String MAYBE_FENCE_REPLICAS = "maybeFenceReplicas";
-
-    private void rescheduleMaybeFenceStaleBrokers() {
-        long nextCheckTimeNs = 
clusterControl.heartbeatManager().nextCheckTimeNs();
-        if (nextCheckTimeNs == Long.MAX_VALUE) {
-            cancelMaybeFenceReplicas();
-            return;
-        }
-        scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs,
-            () -> {
-                ControllerResult<Void> result = 
replicationControl.maybeFenceOneStaleBroker();
-                // This following call ensures that if there are multiple 
brokers that
-                // are currently stale, then fencing for them is scheduled 
immediately
-                rescheduleMaybeFenceStaleBrokers();
-                return result;
-            },
-            EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME));
-    }

Review Comment:
   You are right that this will be slightly less accurate than the old way 
(assuming a non-congested controller event queue -- with congestion both ways 
have error).
   
   I added this comment to `maybeFenceStaleBrokerPeriodNs`:
   ```
        * We sample 8 times per broker timeout period, so we'll generally fence 
a broker in no more
        * than 112.5% of the given broker session timeout.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to