mumrah commented on code in PR #17502: URL: https://github.com/apache/kafka/pull/17502#discussion_r1811413417
########## metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java: ########## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; + +import org.slf4j.Logger; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + + +class PeriodicTaskControlManager { + static class Builder { + private LogContext logContext = null; + private Time time = Time.SYSTEM; + private QueueAccessor queueAccessor = null; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTime(Time time) { + this.time = time; + return this; + } + + Builder setQueueAccessor(QueueAccessor queueAccessor) { + this.queueAccessor = queueAccessor; + return this; + } + + PeriodicTaskControlManager build() { + if (logContext == null) logContext = new LogContext(); + if (queueAccessor == null) throw new RuntimeException("You must set queueAccessor"); + return new PeriodicTaskControlManager(logContext, + time, + queueAccessor); + } + } + + interface QueueAccessor { + void scheduleDeferred( + String tag, + long deadlineNs, + Supplier<ControllerResult<Void>> op + ); + + void cancelDeferred(String tag); + } + + class PeriodicTaskOperation implements Supplier<ControllerResult<Void>> { + private final PeriodicTask task; + + PeriodicTaskOperation(PeriodicTask task) { + this.task = task; + } + + @Override + public ControllerResult<Void> get() { + long startNs = 0; + if (log.isDebugEnabled() || task.flags().contains(PeriodicTaskFlag.VERBOSE)) { + startNs = time.nanoseconds(); + } + ControllerResult<Boolean> result = task.op().get(); Review Comment: This call should probably by done in a try block. Otherwise I think we will never get a chance to reschedule it if there's some exception ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -1855,6 +1633,97 @@ private QuorumController( this.raftClient.register(metaLogListener); } + /** + * Register the writeNoOpRecord task. + * + * This task periodically writes a NoOpRecord to the metadata log, if the MetadataVersion + * supports it. + * + * @param maxIdleIntervalNs The period at which to write the NoOpRecord. + */ + private void registerWriteNoOpRecord(long maxIdleIntervalNs) { + periodicControl.registerTask(new PeriodicTask("writeNoOpRecord", + () -> { + ArrayList<ApiMessageAndVersion> records = new ArrayList<>(1); + if (featureControl.metadataVersion().isNoOpRecordSupported()) { + records.add(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)); + } + return ControllerResult.of(records, false); + }, + maxIdleIntervalNs, + EnumSet.noneOf(PeriodicTaskFlag.class))); + } + + /** + * Calculate what the period should be for the maybeFenceStaleBroker task. + * + * @param sessionTimeoutNs The configured broker session timeout period in nanoseconds. + * + * @return The period for the maybeFenceStaleBroker task in nanoseconds. + */ + static long maybeFenceStaleBrokerPeriodNs(long sessionTimeoutNs) { + return Math.max(TimeUnit.MILLISECONDS.toNanos(1), sessionTimeoutNs / 4); + } + + /** + * Register the maybeFenceStaleBroker task. + * + * This task periodically checks to see if there is a stale broker that needs to + * be fenced. It will only ever remove one stale broker at a time. + * + * @param sessionTimeoutNs The broker session timeout in nanoseconds. + */ + private void registerMaybeFenceStaleBroker(long sessionTimeoutNs) { + periodicControl.registerTask(new PeriodicTask("maybeFenceStaleBroker", + () -> replicationControl.maybeFenceOneStaleBroker(), Review Comment: nit: These lambdas can be method references ########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -240,7 +240,7 @@ boolean check() { /** * The broker heartbeat manager, or null if this controller is on standby. */ - private BrokerHeartbeatManager heartbeatManager; + private volatile BrokerHeartbeatManager heartbeatManager; Review Comment: Hm, I guess this was previously a bug technically. Glancing through the usages, we may have some TOCTOU bugs with this. We tend to check if it's null and then do some stuff, but maybe it could become null after that check if there was a race on the variable read. ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -341,9 +345,12 @@ public List<ApiMessageAndVersion> sweepExpiredDelegationTokens() { oldTokenInformation.tokenId(), oldTokenInformation.ownerAsString()); records.add(new ApiMessageAndVersion(new RemoveDelegationTokenRecord(). setTokenId(oldTokenInformation.tokenId()), (short) 0)); + if (records.size() >= MAX_RECORDS_PER_EXPIRATION) { Review Comment: Why limit this to 1000? Just to avoid batch size issues? ########## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatTracker.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.utils.Time; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.ConcurrentHashMap; + +class BrokerHeartbeatTracker { Review Comment: Can you add a brief class level javadoc mentioning that this is updated on the RPC threads and read in the controller threads? ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -1597,18 +1597,34 @@ public ControllerResult<Void> unregisterBroker(int brokerId) { return ControllerResult.of(records, null); } - ControllerResult<Void> maybeFenceOneStaleBroker() { - List<ApiMessageAndVersion> records = new ArrayList<>(); + ControllerResult<Boolean> maybeFenceOneStaleBroker() { BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager(); - heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> { - // Even though multiple brokers can go stale at a time, we will process - // fencing one at a time so that the effect of fencing each broker is visible - // to the system prior to processing the next one - log.info("Fencing broker {} because its session has timed out.", brokerId); - handleBrokerFenced(brokerId, records); - heartbeatManager.fence(brokerId); - }); - return ControllerResult.of(records, null); + Optional<BrokerIdAndEpoch> idAndEpoch = heartbeatManager.tracker().maybeRemoveExpired(); Review Comment: What happens when brokers restart and get new epochs? Do they remain tracked by the BrokerHeartbeatTracker until they expire? ########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -511,6 +511,22 @@ BrokerFeature processRegistrationFeature( setMaxSupportedVersion(feature.maxSupportedVersion()); } + /** + * Track an incoming broker heartbeat. Unlike most functions, this one is not called from the main + * controller thread, so it can only access local, volatile and atomic data. Review Comment: It's ultimately called from the RPC thread (ControllerApis) ########## metadata/src/main/java/org/apache/kafka/controller/PeriodicTask.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import java.util.EnumSet; +import java.util.function.Supplier; + +class PeriodicTask { + /** + * The name of this periodic task. + */ + private final String name; + + /** + * The callback for this task. If ControllerResult.response is true, we will schedule the + * task again after only a very short delay. This is useful if we only finished part of the + * work we wanted to finish. + */ + private final Supplier<ControllerResult<Boolean>> op; + + /** + * The period of the task, in nanoseconds. + */ + private final long periodNs; + + /** + * The flags used by this periodic task. + */ + private final EnumSet<PeriodicTaskFlag> flags; + + PeriodicTask( + String name, + Supplier<ControllerResult<Boolean>> op, + long periodNs, + EnumSet<PeriodicTaskFlag> flags + ) { + this.name = name; + this.op = op; + this.periodNs = periodNs; + this.flags = flags; + } + + String name() { + return name; + } + + Supplier<ControllerResult<Boolean>> op() { + return op; + } + + long periodNs() { + return periodNs; + } Review Comment: Maybe not for this PR, but I wonder if some periodic tasks might want to dynamically change their period. For example, if we find an expired session we might want to reschedule more quickly in case there are other expired sessions waiting to be detected. ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -1159,255 +1176,12 @@ void renounce() { newWrongControllerException(OptionalInt.empty())); offsetControl.deactivate(); clusterControl.deactivate(); - cancelMaybeFenceReplicas(); - cancelMaybeBalancePartitionLeaders(); - cancelMaybeNextElectUncleanLeaders(); - cancelNextWriteNoOpRecord(); + periodicControl.deactivate(); } catch (Throwable e) { fatalFaultHandler.handleFault("exception while renouncing leadership", e); } } - private <T> void scheduleDeferredWriteEvent( - String name, - long deadlineNs, - ControllerWriteOperation<T> op, - EnumSet<ControllerOperationFlag> flags - ) { - if (!flags.contains(DOES_NOT_UPDATE_QUEUE_TIME)) { - throw new RuntimeException("deferred events should not update the queue time."); - } - ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op, flags); - queue.scheduleDeferred(name, new EarliestDeadlineFunction(deadlineNs), event); - event.future.exceptionally(e -> { - if (ControllerExceptions.isTimeoutException(e)) { - log.error("Cancelling deferred write event {} because the event queue " + - "is now closed.", name); - return null; - } else if (e instanceof NotControllerException) { - log.debug("Cancelling deferred write event {} because this controller " + - "is no longer active.", name); - return null; - } - log.error("Unexpected exception while executing deferred write event {}. " + - "Rescheduling for a minute from now.", name, e); - scheduleDeferredWriteEvent(name, - deadlineNs + NANOSECONDS.convert(1, TimeUnit.MINUTES), op, flags); - return null; - }); - } - - static final String MAYBE_FENCE_REPLICAS = "maybeFenceReplicas"; - - private void rescheduleMaybeFenceStaleBrokers() { - long nextCheckTimeNs = clusterControl.heartbeatManager().nextCheckTimeNs(); - if (nextCheckTimeNs == Long.MAX_VALUE) { - cancelMaybeFenceReplicas(); - return; - } - scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs, - () -> { - ControllerResult<Void> result = replicationControl.maybeFenceOneStaleBroker(); - // This following call ensures that if there are multiple brokers that - // are currently stale, then fencing for them is scheduled immediately - rescheduleMaybeFenceStaleBrokers(); - return result; - }, - EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME)); - } Review Comment: In this method, we would determine the next time a broker _could_ be expired (last contact time + time until expiration) to run the event. This would have the effect of expiring brokers immediately as they expire (or close to it). Now, we are using a periodic task which runs at a steady rate. If we have just missed an expiration, say by 1ms, then will we have to wait until the next execution to fence that broker? IOW, if we set the session expiry to 1000ms, is it possible that we will sometimes see brokers not expired until after ~2000ms? Of course, both the old way and the new way are prone to long running events which cause a delay in the task execution. So even in the old way, it was best effort. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org