mumrah commented on code in PR #17502:
URL: https://github.com/apache/kafka/pull/17502#discussion_r1811413417


##########
metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+
+class PeriodicTaskControlManager {
+    static class Builder {
+        private LogContext logContext = null;
+        private Time time = Time.SYSTEM;
+        private QueueAccessor queueAccessor = null;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTime(Time time) {
+            this.time = time;
+            return this;
+        }
+
+        Builder setQueueAccessor(QueueAccessor queueAccessor) {
+            this.queueAccessor = queueAccessor;
+            return this;
+        }
+
+        PeriodicTaskControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            if (queueAccessor == null) throw new RuntimeException("You must 
set queueAccessor");
+            return new PeriodicTaskControlManager(logContext,
+                    time,
+                    queueAccessor);
+        }
+    }
+
+    interface QueueAccessor {
+        void scheduleDeferred(
+            String tag,
+            long deadlineNs,
+            Supplier<ControllerResult<Void>> op
+        );
+
+        void cancelDeferred(String tag);
+    }
+
+    class PeriodicTaskOperation implements Supplier<ControllerResult<Void>> {
+        private final PeriodicTask task;
+
+        PeriodicTaskOperation(PeriodicTask task) {
+            this.task = task;
+        }
+
+        @Override
+        public ControllerResult<Void> get() {
+            long startNs = 0;
+            if (log.isDebugEnabled() || 
task.flags().contains(PeriodicTaskFlag.VERBOSE)) {
+                startNs = time.nanoseconds();
+            }
+            ControllerResult<Boolean> result = task.op().get();

Review Comment:
   This call should probably by done in a try block. Otherwise I think we will 
never get a chance to reschedule it if there's some exception



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1855,6 +1633,97 @@ private QuorumController(
         this.raftClient.register(metaLogListener);
     }
 
+    /**
+     * Register the writeNoOpRecord task.
+     *
+     * This task periodically writes a NoOpRecord to the metadata log, if the 
MetadataVersion
+     * supports it.
+     *
+     * @param maxIdleIntervalNs     The period at which to write the 
NoOpRecord.
+     */
+    private void registerWriteNoOpRecord(long maxIdleIntervalNs) {
+        periodicControl.registerTask(new PeriodicTask("writeNoOpRecord",
+            () -> {
+                ArrayList<ApiMessageAndVersion> records = new ArrayList<>(1);
+                if (featureControl.metadataVersion().isNoOpRecordSupported()) {
+                    records.add(new ApiMessageAndVersion(new NoOpRecord(), 
(short) 0));
+                }
+                return ControllerResult.of(records, false);
+            },
+            maxIdleIntervalNs,
+            EnumSet.noneOf(PeriodicTaskFlag.class)));
+    }
+
+    /**
+     * Calculate what the period should be for the maybeFenceStaleBroker task.
+     *
+     * @param sessionTimeoutNs      The configured broker session timeout 
period in nanoseconds.
+     *
+     * @return                      The period for the maybeFenceStaleBroker 
task in nanoseconds.
+     */
+    static long maybeFenceStaleBrokerPeriodNs(long sessionTimeoutNs) {
+        return Math.max(TimeUnit.MILLISECONDS.toNanos(1), sessionTimeoutNs / 
4);
+    }
+
+    /**
+     * Register the maybeFenceStaleBroker task.
+     *
+     * This task periodically checks to see if there is a stale broker that 
needs to
+     * be fenced. It will only ever remove one stale broker at a time.
+     *
+     * @param sessionTimeoutNs      The broker session timeout in nanoseconds.
+     */
+    private void registerMaybeFenceStaleBroker(long sessionTimeoutNs) {
+        periodicControl.registerTask(new PeriodicTask("maybeFenceStaleBroker",
+            () -> replicationControl.maybeFenceOneStaleBroker(),

Review Comment:
   nit: These lambdas can be method references



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -240,7 +240,7 @@ boolean check() {
     /**
      * The broker heartbeat manager, or null if this controller is on standby.
      */
-    private BrokerHeartbeatManager heartbeatManager;
+    private volatile BrokerHeartbeatManager heartbeatManager;

Review Comment:
   Hm, I guess this was previously a bug technically. Glancing through the 
usages, we may have some TOCTOU bugs with this. We tend to check if it's null 
and then do some stuff, but maybe it could become null after that check if 
there was a race on the variable read.



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -341,9 +345,12 @@ public List<ApiMessageAndVersion> 
sweepExpiredDelegationTokens() {
                     oldTokenInformation.tokenId(), 
oldTokenInformation.ownerAsString());
                 records.add(new ApiMessageAndVersion(new 
RemoveDelegationTokenRecord().
                     setTokenId(oldTokenInformation.tokenId()), (short) 0));
+                if (records.size() >= MAX_RECORDS_PER_EXPIRATION) {

Review Comment:
   Why limit this to 1000? Just to avoid batch size issues?



##########
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatTracker.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.utils.Time;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
+
+class BrokerHeartbeatTracker {

Review Comment:
   Can you add a brief class level javadoc mentioning that this is updated on 
the RPC threads and read in the controller threads?



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1597,18 +1597,34 @@ public ControllerResult<Void> unregisterBroker(int 
brokerId) {
         return ControllerResult.of(records, null);
     }
 
-    ControllerResult<Void> maybeFenceOneStaleBroker() {
-        List<ApiMessageAndVersion> records = new ArrayList<>();
+    ControllerResult<Boolean> maybeFenceOneStaleBroker() {
         BrokerHeartbeatManager heartbeatManager = 
clusterControl.heartbeatManager();
-        heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> {
-            // Even though multiple brokers can go stale at a time, we will 
process
-            // fencing one at a time so that the effect of fencing each broker 
is visible
-            // to the system prior to processing the next one
-            log.info("Fencing broker {} because its session has timed out.", 
brokerId);
-            handleBrokerFenced(brokerId, records);
-            heartbeatManager.fence(brokerId);
-        });
-        return ControllerResult.of(records, null);
+        Optional<BrokerIdAndEpoch> idAndEpoch = 
heartbeatManager.tracker().maybeRemoveExpired();

Review Comment:
   What happens when brokers restart and get new epochs? Do they remain tracked 
by the BrokerHeartbeatTracker until they expire?



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -511,6 +511,22 @@ BrokerFeature processRegistrationFeature(
                 setMaxSupportedVersion(feature.maxSupportedVersion());
     }
 
+    /**
+     * Track an incoming broker heartbeat. Unlike most functions, this one is 
not called from the main
+     * controller thread, so it can only access local, volatile and atomic 
data.

Review Comment:
   It's ultimately called from the RPC thread (ControllerApis)



##########
metadata/src/main/java/org/apache/kafka/controller/PeriodicTask.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.EnumSet;
+import java.util.function.Supplier;
+
+class PeriodicTask {
+    /**
+     * The name of this periodic task.
+     */
+    private final String name;
+
+    /**
+     * The callback for this task. If ControllerResult.response is true, we 
will schedule the
+     * task again after only a very short delay. This is useful if we only 
finished part of the
+     * work we wanted to finish.
+     */
+    private final Supplier<ControllerResult<Boolean>> op;
+
+    /**
+     * The period of the task, in nanoseconds.
+     */
+    private final long periodNs;
+
+    /**
+     * The flags used by this periodic task.
+     */
+    private final EnumSet<PeriodicTaskFlag> flags;
+
+    PeriodicTask(
+        String name,
+        Supplier<ControllerResult<Boolean>> op,
+        long periodNs,
+        EnumSet<PeriodicTaskFlag> flags
+    ) {
+        this.name = name;
+        this.op = op;
+        this.periodNs = periodNs;
+        this.flags = flags;
+    }
+
+    String name() {
+        return name;
+    }
+
+    Supplier<ControllerResult<Boolean>> op() {
+        return op;
+    }
+
+    long periodNs() {
+        return periodNs;
+    }

Review Comment:
   Maybe not for this PR, but I wonder if some periodic tasks might want to 
dynamically change their period. For example, if we find an expired session we 
might want to reschedule more quickly in case there are other expired sessions 
waiting to be detected.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1159,255 +1176,12 @@ void renounce() {
                     newWrongControllerException(OptionalInt.empty()));
             offsetControl.deactivate();
             clusterControl.deactivate();
-            cancelMaybeFenceReplicas();
-            cancelMaybeBalancePartitionLeaders();
-            cancelMaybeNextElectUncleanLeaders();
-            cancelNextWriteNoOpRecord();
+            periodicControl.deactivate();
         } catch (Throwable e) {
             fatalFaultHandler.handleFault("exception while renouncing 
leadership", e);
         }
     }
 
-    private <T> void scheduleDeferredWriteEvent(
-        String name,
-        long deadlineNs,
-        ControllerWriteOperation<T> op,
-        EnumSet<ControllerOperationFlag> flags
-    ) {
-        if (!flags.contains(DOES_NOT_UPDATE_QUEUE_TIME)) {
-            throw new RuntimeException("deferred events should not update the 
queue time.");
-        }
-        ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op, 
flags);
-        queue.scheduleDeferred(name, new EarliestDeadlineFunction(deadlineNs), 
event);
-        event.future.exceptionally(e -> {
-            if (ControllerExceptions.isTimeoutException(e)) {
-                log.error("Cancelling deferred write event {} because the 
event queue " +
-                    "is now closed.", name);
-                return null;
-            } else if (e instanceof NotControllerException) {
-                log.debug("Cancelling deferred write event {} because this 
controller " +
-                    "is no longer active.", name);
-                return null;
-            }
-            log.error("Unexpected exception while executing deferred write 
event {}. " +
-                "Rescheduling for a minute from now.", name, e);
-            scheduleDeferredWriteEvent(name,
-                deadlineNs + NANOSECONDS.convert(1, TimeUnit.MINUTES), op, 
flags);
-            return null;
-        });
-    }
-
-    static final String MAYBE_FENCE_REPLICAS = "maybeFenceReplicas";
-
-    private void rescheduleMaybeFenceStaleBrokers() {
-        long nextCheckTimeNs = 
clusterControl.heartbeatManager().nextCheckTimeNs();
-        if (nextCheckTimeNs == Long.MAX_VALUE) {
-            cancelMaybeFenceReplicas();
-            return;
-        }
-        scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs,
-            () -> {
-                ControllerResult<Void> result = 
replicationControl.maybeFenceOneStaleBroker();
-                // This following call ensures that if there are multiple 
brokers that
-                // are currently stale, then fencing for them is scheduled 
immediately
-                rescheduleMaybeFenceStaleBrokers();
-                return result;
-            },
-            EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME));
-    }

Review Comment:
   In this method, we would determine the next time a broker _could_ be expired 
(last contact time + time until expiration) to run the event. This would have 
the effect of expiring brokers immediately as they expire (or close to it). 
   
   Now, we are using a periodic task which runs at a steady rate. 
   
   If we have just missed an expiration, say by 1ms, then will we have to wait 
until the next execution to fence that broker? IOW, if we set the session 
expiry to 1000ms, is it possible that we will sometimes see brokers not expired 
until after ~2000ms?
   
   Of course, both the old way and the new way are prone to long running events 
which cause a delay in the task execution. So even in the old way, it was best 
effort.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to