m1a2st commented on code in PR #19807: URL: https://github.com/apache/kafka/pull/19807#discussion_r2107979149
########## server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java: ########## @@ -0,0 +1,938 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Sanitizer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.server.quota.QuotaUtils; +import org.apache.kafka.server.quota.SensorAccess; +import org.apache.kafka.server.quota.ThrottleCallback; +import org.apache.kafka.server.quota.ThrottledChannel; +import org.apache.kafka.server.util.ShutdownableThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +final class QuotaTypes { + static final int NO_QUOTAS = 0; + static final int CLIENT_ID_QUOTA_ENABLED = 1; + static final int USER_QUOTA_ENABLED = 2; + static final int USER_CLIENT_ID_QUOTA_ENABLED = 4; + static final int CUSTOM_QUOTAS = 8; // No metric update optimizations are used with custom quotas +} + +public class ClientQuotaManager { + + private static final Logger log = LoggerFactory.getLogger(ClientQuotaManager.class); + + // Purge sensors after 1 hour of inactivity + public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + private static final String DEFAULT_NAME = "<default>"; + + public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE); + public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null); + public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, DefaultClientIdEntity.INSTANCE); + + public interface BaseUserEntity extends ClientQuotaEntity.ConfigEntity { } + + public static class UserEntity implements BaseUserEntity { + private final String sanitizedUser; + + public UserEntity(String sanitizedUser) { + this.sanitizedUser = sanitizedUser; + } + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.USER; + } + + @Override + public String name() { + return Sanitizer.desanitize(sanitizedUser); + } + + public String getSanitizedUser() { + return sanitizedUser; + } + + @Override + public String toString() { + return "user " + sanitizedUser; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + UserEntity that = (UserEntity) obj; + return sanitizedUser.equals(that.sanitizedUser); + } + + @Override + public int hashCode() { + return sanitizedUser.hashCode(); + } + } + + public static class ClientIdEntity implements ClientQuotaEntity.ConfigEntity { + private final String clientId; + + public ClientIdEntity(String clientId) { + this.clientId = clientId; + } + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.CLIENT_ID; + } + + @Override + public String name() { + return clientId; + } + + public String getClientId() { + return clientId; + } + + @Override + public String toString() { + return "client-id " + clientId; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + ClientIdEntity that = (ClientIdEntity) obj; + return clientId.equals(that.clientId); + } + + @Override + public int hashCode() { + return clientId.hashCode(); + } + } + + public static class DefaultUserEntity implements BaseUserEntity { + public static final DefaultUserEntity INSTANCE = new DefaultUserEntity(); + + private DefaultUserEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default user"; + } + } + + public static class DefaultClientIdEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultClientIdEntity INSTANCE = new DefaultClientIdEntity(); + + private DefaultClientIdEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default client-id"; + } + } + + public static class KafkaQuotaEntity implements ClientQuotaEntity { + private final BaseUserEntity userEntity; + private final ClientQuotaEntity.ConfigEntity clientIdEntity; + + public KafkaQuotaEntity(BaseUserEntity userEntity, ClientQuotaEntity.ConfigEntity clientIdEntity) { + this.userEntity = userEntity; + this.clientIdEntity = clientIdEntity; + } + + @Override + public List<ConfigEntity> configEntities() { + List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>(); + if (userEntity != null) { + entities.add(userEntity); + } + if (clientIdEntity != null) { + entities.add(clientIdEntity); + } + return entities; + } + + public String sanitizedUser() { + if (userEntity instanceof UserEntity) { + return ((UserEntity) userEntity).getSanitizedUser(); + } else if (userEntity == DefaultUserEntity.INSTANCE) { + return DEFAULT_NAME; + } + return ""; + } + + public String clientId() { + return clientIdEntity != null ? clientIdEntity.name() : ""; + } + + @Override + public String toString() { + String user = userEntity != null ? userEntity.toString() : ""; + String clientId = clientIdEntity != null ? clientIdEntity.toString() : ""; + return (user + " " + clientId).trim(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + KafkaQuotaEntity that = (KafkaQuotaEntity) obj; + return java.util.Objects.equals(userEntity, that.userEntity) && + java.util.Objects.equals(clientIdEntity, that.clientIdEntity); + } + + @Override + public int hashCode() { + return java.util.Objects.hash(userEntity, clientIdEntity); + } + } + + public static class DefaultTags { + public static final String USER = "user"; + public static final String CLIENT_ID = "client-id"; + } + + /** + * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics + * for all clients. + * <p/> Review Comment: Please move this JavaDoc to the helper class above. ########## server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java: ########## @@ -0,0 +1,938 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Sanitizer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.server.quota.QuotaUtils; +import org.apache.kafka.server.quota.SensorAccess; +import org.apache.kafka.server.quota.ThrottleCallback; +import org.apache.kafka.server.quota.ThrottledChannel; +import org.apache.kafka.server.util.ShutdownableThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +final class QuotaTypes { + static final int NO_QUOTAS = 0; + static final int CLIENT_ID_QUOTA_ENABLED = 1; + static final int USER_QUOTA_ENABLED = 2; + static final int USER_CLIENT_ID_QUOTA_ENABLED = 4; + static final int CUSTOM_QUOTAS = 8; // No metric update optimizations are used with custom quotas +} + +public class ClientQuotaManager { + + private static final Logger log = LoggerFactory.getLogger(ClientQuotaManager.class); + + // Purge sensors after 1 hour of inactivity + public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + private static final String DEFAULT_NAME = "<default>"; + + public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE); + public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null); + public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, DefaultClientIdEntity.INSTANCE); + + public interface BaseUserEntity extends ClientQuotaEntity.ConfigEntity { } + + public static class UserEntity implements BaseUserEntity { + private final String sanitizedUser; + + public UserEntity(String sanitizedUser) { + this.sanitizedUser = sanitizedUser; + } + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.USER; + } + + @Override + public String name() { + return Sanitizer.desanitize(sanitizedUser); + } + + public String getSanitizedUser() { + return sanitizedUser; + } + + @Override + public String toString() { + return "user " + sanitizedUser; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + UserEntity that = (UserEntity) obj; + return sanitizedUser.equals(that.sanitizedUser); + } + + @Override + public int hashCode() { + return sanitizedUser.hashCode(); + } + } + + public static class ClientIdEntity implements ClientQuotaEntity.ConfigEntity { + private final String clientId; + + public ClientIdEntity(String clientId) { + this.clientId = clientId; + } + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.CLIENT_ID; + } + + @Override + public String name() { + return clientId; + } + + public String getClientId() { + return clientId; + } + + @Override + public String toString() { + return "client-id " + clientId; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + ClientIdEntity that = (ClientIdEntity) obj; + return clientId.equals(that.clientId); + } + + @Override + public int hashCode() { + return clientId.hashCode(); + } + } + + public static class DefaultUserEntity implements BaseUserEntity { + public static final DefaultUserEntity INSTANCE = new DefaultUserEntity(); + + private DefaultUserEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default user"; + } + } + + public static class DefaultClientIdEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultClientIdEntity INSTANCE = new DefaultClientIdEntity(); + + private DefaultClientIdEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default client-id"; + } + } + + public static class KafkaQuotaEntity implements ClientQuotaEntity { + private final BaseUserEntity userEntity; + private final ClientQuotaEntity.ConfigEntity clientIdEntity; + + public KafkaQuotaEntity(BaseUserEntity userEntity, ClientQuotaEntity.ConfigEntity clientIdEntity) { + this.userEntity = userEntity; + this.clientIdEntity = clientIdEntity; + } + + @Override + public List<ConfigEntity> configEntities() { + List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>(); + if (userEntity != null) { + entities.add(userEntity); + } + if (clientIdEntity != null) { + entities.add(clientIdEntity); + } + return entities; + } + + public String sanitizedUser() { + if (userEntity instanceof UserEntity) { + return ((UserEntity) userEntity).getSanitizedUser(); + } else if (userEntity == DefaultUserEntity.INSTANCE) { + return DEFAULT_NAME; + } + return ""; + } + + public String clientId() { + return clientIdEntity != null ? clientIdEntity.name() : ""; + } + + @Override + public String toString() { + String user = userEntity != null ? userEntity.toString() : ""; + String clientId = clientIdEntity != null ? clientIdEntity.toString() : ""; + return (user + " " + clientId).trim(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + KafkaQuotaEntity that = (KafkaQuotaEntity) obj; + return java.util.Objects.equals(userEntity, that.userEntity) && + java.util.Objects.equals(clientIdEntity, that.clientIdEntity); + } + + @Override + public int hashCode() { + return java.util.Objects.hash(userEntity, clientIdEntity); + } + } + + public static class DefaultTags { + public static final String USER = "user"; + public static final String CLIENT_ID = "client-id"; + } + + /** + * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics + * for all clients. + * <p/> + * Quotas can be set at <user, client-id>, user or client-id levels. For a given client connection, + * the most specific quota matching the connection will be applied. For example, if both a <user, client-id> + * and a user quota match a connection, the <user, client-id> quota will be used. Otherwise, user quota takes + * precedence over client-id quota. The order of precedence is: + * <ul> + * <li>/config/users/<user>/clients/<client-id> + * <li>/config/users/<user>/clients/<default> + * <li>/config/users/<user> + * <li>/config/users/<default>/clients/<client-id> + * <li>/config/users/<default>/clients/<default> + * <li>/config/users/<default> + * <li>/config/clients/<client-id> + * <li>/config/clients/<default> + * </ul> + * Quota limits including defaults may be updated dynamically. The implementation is optimized for the case + * where a single level of quotas is configured. + * + * @param config @ClientQuotaManagerConfig quota configs + * @param metrics @Metrics Metrics instance + * @param quotaType Quota type of this quota manager + * @param time @Time object to use + * @param threadNamePrefix The thread prefix to use + * @param clientQuotaCallbackPlugin An optional @ClientQuotaCallback and + * wrap it in a {@link org.apache.kafka.common.internals.Plugin} + */ + + private final ClientQuotaManagerConfig config; + protected final Metrics metrics; + private final QuotaType quotaType; + protected final Time time; + private final Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin; + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final SensorAccess sensorAccessor; + private final ClientQuotaCallback quotaCallback; + private final ClientQuotaType clientQuotaType; + + private volatile int quotaTypesEnabled; + + private final Sensor delayQueueSensor; + private final DelayQueue<ThrottledChannel> delayQueue = new DelayQueue<>(); + private final ThrottledChannelReaper throttledChannelReaper; + + public void processThrottledChannelReaperDoWork() { + throttledChannelReaper.doWork(); + } + + public ClientQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + QuotaType quotaType, + Time time, + String threadNamePrefix, + Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin) { + this.config = config; + this.metrics = metrics; + this.quotaType = quotaType; + this.time = time; + this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin; + + this.sensorAccessor = new SensorAccess(lock, metrics); + this.clientQuotaType = QuotaType.toClientQuotaType(quotaType); + + this.quotaTypesEnabled = clientQuotaCallbackPlugin.isPresent() ? + QuotaTypes.CUSTOM_QUOTAS : QuotaTypes.NO_QUOTAS; + + this.delayQueueSensor = metrics.sensor(quotaType.toString() + "-delayQueue"); + this.delayQueueSensor.add(metrics.metricName("queue-size", quotaType.toString(), + "Tracks the size of the delay queue"), new CumulativeSum()); + this.throttledChannelReaper = new ThrottledChannelReaper(delayQueue, threadNamePrefix); + + this.quotaCallback = clientQuotaCallbackPlugin + .map(Plugin::get) + .orElse(new DefaultQuotaCallback()); + + start(); // Use start method to keep spotbugs happy + } + + public ClientQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + QuotaType quotaType, + Time time, + String threadNamePrefix) { + this(config, metrics, quotaType, time, threadNamePrefix, Optional.empty()); + } + + protected Metrics metrics() { + return metrics; + } + protected Time time() { + return time; + } + + private void start() { + throttledChannelReaper.start(); + } + + /** + * Reaper thread that triggers channel unmute callbacks on all throttled channels + */ + public class ThrottledChannelReaper extends ShutdownableThread { + private final DelayQueue<ThrottledChannel> delayQueue; + + public ThrottledChannelReaper(DelayQueue<ThrottledChannel> delayQueue, String prefix) { + super(prefix + "ThrottledChannelReaper-" + quotaType, false); + this.delayQueue = delayQueue; + } + + @Override + public void doWork() { + ThrottledChannel throttledChannel; + try { + throttledChannel = delayQueue.poll(1, TimeUnit.SECONDS); + if (throttledChannel != null) { + // Decrement the size of the delay queue + delayQueueSensor.record(-1); + // Notify the socket server that throttling is done for this channel + throttledChannel.notifyThrottlingDone(); + } + } catch (InterruptedException e) { + // Ignore and continue + Thread.currentThread().interrupt(); + } + } + } + + /** + * Returns true if any quotas are enabled for this quota manager. This is used + * to determine if quota related metrics should be created. + * Note: If any quotas (static defaults, dynamic defaults or quota overrides) have + * been configured for this broker at any time for this quota type, quotasEnabled will + * return true until the next broker restart, even if all quotas are subsequently deleted. + */ + public boolean quotasEnabled() { + return quotaTypesEnabled != QuotaTypes.NO_QUOTAS; + } + + /** + * See recordAndGetThrottleTimeMs. + */ + public int maybeRecordAndGetThrottleTimeMs(Session session, String clientId, double value, long timeMs) { + // Record metrics only if quotas are enabled. + if (quotasEnabled()) { + return recordAndGetThrottleTimeMs(session, clientId, value, timeMs); + } else { + return 0; + } + } + + /** + * Records that a user/clientId accumulated or would like to accumulate the provided amount at the + * specified time, returns throttle time in milliseconds. + * + * @param session The session from which the user is extracted + * @param clientId The client id + * @param value The value to accumulate + * @param timeMs The time at which to accumulate the value + * @return The throttle time in milliseconds defines as the time to wait until the average + * rate gets back to the defined quota + */ + public int recordAndGetThrottleTimeMs(Session session, String clientId, double value, long timeMs) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + try { + clientSensors.quotaSensor().record(value, timeMs, true); + return 0; + } catch (QuotaViolationException e) { + int throttleTimeMs = (int) throttleTime(e, timeMs); + if (log.isDebugEnabled()) { + log.debug("Quota violated for sensor ({}). Delay time: ({})", + clientSensors.quotaSensor().name(), throttleTimeMs); + } + return throttleTimeMs; + } + } + + /** + * Records that a user/clientId changed some metric being throttled without checking for + * quota violation. The aggregate value will subsequently be used for throttling when the + * next request is processed. + */ + public void recordNoThrottle(Session session, String clientId, double value) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + clientSensors.quotaSensor().record(value, time.milliseconds(), false); + } + + /** + * "Unrecord" the given value that has already been recorded for the given user/client by recording a negative value + * of the same quantity. + * + * For a throttled fetch, the broker should return an empty response and thus should not record the value. Ideally, + * we would like to compute the throttle time before actually recording the value, but the current Sensor code + * couples value recording and quota checking very tightly. As a workaround, we will unrecord the value for the fetch + * in case of throttling. Rate keeps the sum of values that fall in each time window, so this should bring the + * overall sum back to the previous value. + */ + public void unrecordQuotaSensor(Session session, String clientId, double value, long timeMs) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + clientSensors.quotaSensor().record(value * -1, timeMs, false); + } + + /** + * Returns maximum value that could be recorded without guaranteed throttling. + * Recording any larger value will always be throttled, even if no other values were recorded in the quota window. + * This is used for deciding the maximum bytes that can be fetched at once + */ + public double getMaxValueInQuotaWindow(Session session, String clientId) { + if (quotasEnabled()) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + Double limit = quotaCallback.quotaLimit(clientQuotaType, clientSensors.metricTags()); + if (limit != null) { + return limit * (config.numQuotaSamples - 1) * config.quotaWindowSizeSeconds; + } else { + return Double.MAX_VALUE; + } + } else { + return Double.MAX_VALUE; + } + } + + /** + * Throttle a client by muting the associated channel for the given throttle time. + * + * @param request client request + * @param throttleTimeMs Duration in milliseconds for which the channel is to be muted. + * @param throttleCallback Callback for channel throttling + */ + public void throttle( + String clientId, + Session session, + ThrottleCallback throttleCallback, + int throttleTimeMs + ) { + if (throttleTimeMs > 0) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + clientSensors.throttleTimeSensor().record(throttleTimeMs); + ThrottledChannel throttledChannel = new ThrottledChannel(time, throttleTimeMs, throttleCallback); + delayQueue.add(throttledChannel); + delayQueueSensor.record(); + if (log.isDebugEnabled()) { + log.debug("Channel throttled for sensor ({}). Delay time: ({})", + clientSensors.quotaSensor().name(), throttleTimeMs); + } + } + } + + /** + * Returns the quota for the client with the specified (non-encoded) user principal and client-id. + * Note: this method is expensive, it is meant to be used by tests only + */ + public Quota quota(String user, String clientId) { + KafkaPrincipal userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user); + return quota(userPrincipal, clientId); + } + + /** + * Returns the quota for the client with the specified user principal and client-id. + * Note: this method is expensive, it is meant to be used by tests only + */ + public Quota quota(KafkaPrincipal userPrincipal, String clientId) { + Map<String, String> metricTags = quotaCallback.quotaMetricTags(clientQuotaType, userPrincipal, clientId); + return Quota.upperBound(quotaLimit(metricTags)); + } + + private double quotaLimit(Map<String, String> metricTags) { + Double limit = quotaCallback.quotaLimit(clientQuotaType, metricTags); + return limit != null ? limit : Long.MAX_VALUE; + } + + /** + * This calculates the amount of time needed to bring the metric within quota + * assuming that no new metrics are recorded. + */ + protected long throttleTime(QuotaViolationException e, long timeMs) { + return QuotaUtils.throttleTime(e, timeMs); + } + + + /** + * This function either returns the sensors for a given client id or creates them if they don't exist + */ + public ClientSensors getOrCreateQuotaSensors(Session session, String clientId) { + Map<String, String> metricTags; + + // Use cached sanitized principal if using default callback + if (quotaCallback instanceof DefaultQuotaCallback defaultCallback) { + metricTags = defaultCallback.quotaMetricTags(session.sanitizedUser, clientId); + } else { + // For custom callbacks, preserve whatever order they return + metricTags = quotaCallback.quotaMetricTags(clientQuotaType, session.principal, clientId); + } + + // Create the sensors + ClientSensors sensors = new ClientSensors( + metricTags, + sensorAccessor.getOrCreate( + getQuotaSensorName(metricTags), + INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS, + sensor -> registerQuotaMetrics(metricTags, sensor) + ), + sensorAccessor.getOrCreate( + getThrottleTimeSensorName(metricTags), + INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS, + sensor -> sensor.add(throttleMetricName(metricTags), new Avg()) + ) + ); + + if (quotaCallback.quotaResetRequired(clientQuotaType)) { + updateQuotaMetricConfigs(); + } + + return sensors; + } + + protected void registerQuotaMetrics(Map<String, String> metricTags, Sensor sensor) { + sensor.add( + clientQuotaMetricName(metricTags), + new Rate(), + getQuotaMetricConfig(metricTags) + ); + } + + private String metricTagsToSensorSuffix(Map<String, String> metricTags) { + return String.join(":", metricTags.values()); + } + + private String getThrottleTimeSensorName(Map<String, String> metricTags) { + return quotaType.toString() + "ThrottleTime-" + metricTagsToSensorSuffix(metricTags); + } + + private String getQuotaSensorName(Map<String, String> metricTags) { + return quotaType.toString() + "-" + metricTagsToSensorSuffix(metricTags); + } + + protected MetricConfig getQuotaMetricConfig(Map<String, String> metricTags) { + return getQuotaMetricConfig(quotaLimit(metricTags)); + } + + private MetricConfig getQuotaMetricConfig(double quotaLimit) { + return new MetricConfig() + .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS) + .samples(config.numQuotaSamples) + .quota(new Quota(quotaLimit, true)); + } + + protected Sensor getOrCreateSensor(String sensorName, long expirationTimeSeconds, Consumer<Sensor> registerMetrics) { + return sensorAccessor.getOrCreate( + sensorName, + expirationTimeSeconds, + registerMetrics); + } + + /** + * Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults + * for any of these levels. + * + * @param userEntity user to override if quota applies to <user> or <user, client-id> + * @param clientEntity sanitized client entity to override if quota applies to <client-id> or <user, client-id> + * @param quota custom quota to apply or None if quota override is being removed + */ + public void updateQuota( + Optional<BaseUserEntity> userEntity, + Optional<ClientQuotaEntity.ConfigEntity> clientEntity, + Optional<Quota> quota + ) { + /* + * Acquire the write lock to apply changes in the quota objects. + * This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists). + * If the KafkaMetric hasn't been created, the most recent value will be used from the overriddenQuota map. + * The write lock prevents quota update and creation at the same time. It also guards against concurrent quota change + * notifications + */ + lock.writeLock().lock(); + try { + KafkaQuotaEntity quotaEntity = new KafkaQuotaEntity(userEntity.orElse(null), clientEntity.orElse(null)); + + // Update quota types enabled flags atomically + int currentQuotaTypes = quotaTypesEnabled; + + if (userEntity.isPresent()) { + if (clientEntity.isPresent()) { + currentQuotaTypes |= QuotaTypes.USER_CLIENT_ID_QUOTA_ENABLED; + } else { + currentQuotaTypes |= QuotaTypes.USER_QUOTA_ENABLED; + } + } else if (clientEntity.isPresent()) { + currentQuotaTypes |= QuotaTypes.CLIENT_ID_QUOTA_ENABLED; + } + + quotaTypesEnabled = currentQuotaTypes; + + // Apply quota changes + if (quota.isPresent()) { + quotaCallback.updateQuota(clientQuotaType, quotaEntity, quota.get().bound()); + } else { + quotaCallback.removeQuota(clientQuotaType, quotaEntity); + } + + // Determine which entities need metric config updates + Optional<KafkaQuotaEntity> updatedEntity; + if (userEntity.filter(entity -> entity == DefaultUserEntity.INSTANCE).isPresent() || + clientEntity.filter(entity -> entity == DefaultClientIdEntity.INSTANCE).isPresent()) { + // More than one entity may need updating, so updateQuotaMetricConfigs will go through all metrics + updatedEntity = Optional.empty(); + } else { + updatedEntity = Optional.of(quotaEntity); + } + + updateQuotaMetricConfigs(updatedEntity); + } finally { + lock.writeLock().unlock(); + } + } + + + /** + * Updates metrics configs. This is invoked when quota configs are updated when partitions leaders change + * and custom callbacks that implement partition-based quotas have updated quotas. + * + * param updatedQuotaEntity If set to one entity and quotas have only been enabled at one + * level, then an optimized update is performed with a single metric update. If None is provided, + * or if custom callbacks are used or if multi-level quotas have been enabled, all metric configs + * are checked and updated if required. Review Comment: Could you correct the indentation here? ########## core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala: ########## @@ -964,9 +967,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup val clientId = "test-client-1" servers.foreach { server => server.quotaManagers.produce.updateQuota( - None, - Some(ClientQuotaManager.ClientIdEntity(clientId)), - Some(Quota.upperBound(10000000)) + None.toJava, + Some(new ClientQuotaManager.ClientIdEntity(clientId): ClientQuotaEntity.ConfigEntity).toJava, + Some(Quota.upperBound(10000000)).toJava Review Comment: ```suggestion Optional.empty(), Optional.of(new ClientQuotaManager.ClientIdEntity(clientId): ClientQuotaEntity.ConfigEntity), Optional.of(Quota.upperBound(10000000)) ``` ########## server/src/main/java/org/apache/kafka/server/StrictControllerMutationQuota.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Time; + +/** + * The StrictControllerMutationQuota defines a strict quota for a given user/clientId pair. The + * quota is strict meaning that 1) it does not accept any mutations once the quota is exhausted + * until it gets back to the defined rate; and 2) it does not throttle for any number of mutations + * if quota is not already exhausted. + * + * @param time @Time object to use + * @param quotaSensor @Sensor object with a defined quota for a given user/clientId pair + */ Review Comment: It’s better to move these`@param` tags to the constructor. Class-level JavaDoc should not include `@param` tags. ########## core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala: ########## @@ -154,9 +155,9 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag val quotaValue = newValue.map(new Quota(_, true)) try { manager.updateQuota( - userEntity = userEntity, - clientEntity = clientEntity, - quota = quotaValue + userEntity.toJava, // Convert Scala Option to Java Optional + clientEntity.toJava, // Convert Scala Option to Java Optional + quotaValue.toJava // Convert Scala Option to Java Optional Review Comment: Do we really need these comments? ########## core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala: ########## @@ -19,21 +19,27 @@ package kafka.server import org.apache.kafka.common.metrics.Quota import org.apache.kafka.server.config.ClientQuotaManagerConfig import org.apache.kafka.server.quota.QuotaType +import org.apache.kafka.server.ClientQuotaManager +import org.apache.kafka.server.quota.ClientQuotaEntity import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import java.util.Optional +import scala.jdk.OptionConverters.RichOption class ClientRequestQuotaManagerTest extends BaseClientQuotaManagerTest { private val config = new ClientQuotaManagerConfig() @Test def testRequestPercentageQuotaViolation(): Unit = { val clientRequestQuotaManager = new ClientRequestQuotaManager(config, metrics, time, "", Optional.empty()) + val userEntity: ClientQuotaManager.BaseUserEntity = new ClientQuotaManager.UserEntity("ANONYMOUS") + val clientEntity: ClientQuotaEntity.ConfigEntity = new ClientQuotaManager.ClientIdEntity("test-client") + clientRequestQuotaManager.updateQuota( - Some(ClientQuotaManager.UserEntity("ANONYMOUS")), - Some(ClientQuotaManager.ClientIdEntity("test-client")), - Some(Quota.upperBound(1)) + Some(userEntity).toJava, + Some(clientEntity).toJava, + Some(Quota.upperBound(1)).toJava Review Comment: ```suggestion Optional.of(userEntity), Optional.of(clientEntity), Optional.of(Quota.upperBound(1)) ``` ########## server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java: ########## @@ -0,0 +1,938 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Sanitizer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.server.quota.QuotaUtils; +import org.apache.kafka.server.quota.SensorAccess; +import org.apache.kafka.server.quota.ThrottleCallback; +import org.apache.kafka.server.quota.ThrottledChannel; +import org.apache.kafka.server.util.ShutdownableThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +final class QuotaTypes { + static final int NO_QUOTAS = 0; + static final int CLIENT_ID_QUOTA_ENABLED = 1; + static final int USER_QUOTA_ENABLED = 2; + static final int USER_CLIENT_ID_QUOTA_ENABLED = 4; + static final int CUSTOM_QUOTAS = 8; // No metric update optimizations are used with custom quotas +} + +public class ClientQuotaManager { + + private static final Logger log = LoggerFactory.getLogger(ClientQuotaManager.class); + + // Purge sensors after 1 hour of inactivity + public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + private static final String DEFAULT_NAME = "<default>"; + + public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE); + public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null); + public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, DefaultClientIdEntity.INSTANCE); + + public interface BaseUserEntity extends ClientQuotaEntity.ConfigEntity { } + + public static class UserEntity implements BaseUserEntity { + private final String sanitizedUser; + + public UserEntity(String sanitizedUser) { + this.sanitizedUser = sanitizedUser; + } + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.USER; + } + + @Override + public String name() { + return Sanitizer.desanitize(sanitizedUser); + } + + public String getSanitizedUser() { + return sanitizedUser; + } + + @Override + public String toString() { + return "user " + sanitizedUser; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + UserEntity that = (UserEntity) obj; + return sanitizedUser.equals(that.sanitizedUser); + } + + @Override + public int hashCode() { + return sanitizedUser.hashCode(); + } + } + + public static class ClientIdEntity implements ClientQuotaEntity.ConfigEntity { + private final String clientId; + + public ClientIdEntity(String clientId) { + this.clientId = clientId; + } + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.CLIENT_ID; + } + + @Override + public String name() { + return clientId; + } + + public String getClientId() { + return clientId; + } + + @Override + public String toString() { + return "client-id " + clientId; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + ClientIdEntity that = (ClientIdEntity) obj; + return clientId.equals(that.clientId); + } + + @Override + public int hashCode() { + return clientId.hashCode(); + } + } + + public static class DefaultUserEntity implements BaseUserEntity { + public static final DefaultUserEntity INSTANCE = new DefaultUserEntity(); + + private DefaultUserEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default user"; + } + } + + public static class DefaultClientIdEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultClientIdEntity INSTANCE = new DefaultClientIdEntity(); + + private DefaultClientIdEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default client-id"; + } + } + + public static class KafkaQuotaEntity implements ClientQuotaEntity { + private final BaseUserEntity userEntity; + private final ClientQuotaEntity.ConfigEntity clientIdEntity; + + public KafkaQuotaEntity(BaseUserEntity userEntity, ClientQuotaEntity.ConfigEntity clientIdEntity) { + this.userEntity = userEntity; + this.clientIdEntity = clientIdEntity; + } + + @Override + public List<ConfigEntity> configEntities() { + List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>(); + if (userEntity != null) { + entities.add(userEntity); + } + if (clientIdEntity != null) { + entities.add(clientIdEntity); + } + return entities; + } + + public String sanitizedUser() { + if (userEntity instanceof UserEntity) { + return ((UserEntity) userEntity).getSanitizedUser(); + } else if (userEntity == DefaultUserEntity.INSTANCE) { + return DEFAULT_NAME; + } + return ""; + } + + public String clientId() { + return clientIdEntity != null ? clientIdEntity.name() : ""; + } + + @Override + public String toString() { + String user = userEntity != null ? userEntity.toString() : ""; + String clientId = clientIdEntity != null ? clientIdEntity.toString() : ""; + return (user + " " + clientId).trim(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + KafkaQuotaEntity that = (KafkaQuotaEntity) obj; + return java.util.Objects.equals(userEntity, that.userEntity) && + java.util.Objects.equals(clientIdEntity, that.clientIdEntity); + } + + @Override + public int hashCode() { + return java.util.Objects.hash(userEntity, clientIdEntity); + } + } + + public static class DefaultTags { + public static final String USER = "user"; + public static final String CLIENT_ID = "client-id"; + } + + /** + * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics + * for all clients. + * <p/> + * Quotas can be set at <user, client-id>, user or client-id levels. For a given client connection, + * the most specific quota matching the connection will be applied. For example, if both a <user, client-id> + * and a user quota match a connection, the <user, client-id> quota will be used. Otherwise, user quota takes + * precedence over client-id quota. The order of precedence is: + * <ul> + * <li>/config/users/<user>/clients/<client-id> + * <li>/config/users/<user>/clients/<default> + * <li>/config/users/<user> + * <li>/config/users/<default>/clients/<client-id> + * <li>/config/users/<default>/clients/<default> + * <li>/config/users/<default> + * <li>/config/clients/<client-id> + * <li>/config/clients/<default> + * </ul> + * Quota limits including defaults may be updated dynamically. The implementation is optimized for the case + * where a single level of quotas is configured. + * + * @param config @ClientQuotaManagerConfig quota configs + * @param metrics @Metrics Metrics instance + * @param quotaType Quota type of this quota manager + * @param time @Time object to use + * @param threadNamePrefix The thread prefix to use + * @param clientQuotaCallbackPlugin An optional @ClientQuotaCallback and + * wrap it in a {@link org.apache.kafka.common.internals.Plugin} + */ + + private final ClientQuotaManagerConfig config; + protected final Metrics metrics; + private final QuotaType quotaType; + protected final Time time; + private final Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin; + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final SensorAccess sensorAccessor; + private final ClientQuotaCallback quotaCallback; + private final ClientQuotaType clientQuotaType; + + private volatile int quotaTypesEnabled; + + private final Sensor delayQueueSensor; + private final DelayQueue<ThrottledChannel> delayQueue = new DelayQueue<>(); + private final ThrottledChannelReaper throttledChannelReaper; + + public void processThrottledChannelReaperDoWork() { + throttledChannelReaper.doWork(); + } + + public ClientQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + QuotaType quotaType, + Time time, + String threadNamePrefix, + Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin) { + this.config = config; + this.metrics = metrics; + this.quotaType = quotaType; + this.time = time; + this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin; + + this.sensorAccessor = new SensorAccess(lock, metrics); + this.clientQuotaType = QuotaType.toClientQuotaType(quotaType); + + this.quotaTypesEnabled = clientQuotaCallbackPlugin.isPresent() ? + QuotaTypes.CUSTOM_QUOTAS : QuotaTypes.NO_QUOTAS; + + this.delayQueueSensor = metrics.sensor(quotaType.toString() + "-delayQueue"); + this.delayQueueSensor.add(metrics.metricName("queue-size", quotaType.toString(), + "Tracks the size of the delay queue"), new CumulativeSum()); + this.throttledChannelReaper = new ThrottledChannelReaper(delayQueue, threadNamePrefix); + + this.quotaCallback = clientQuotaCallbackPlugin + .map(Plugin::get) + .orElse(new DefaultQuotaCallback()); + + start(); // Use start method to keep spotbugs happy + } + + public ClientQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + QuotaType quotaType, + Time time, + String threadNamePrefix) { + this(config, metrics, quotaType, time, threadNamePrefix, Optional.empty()); + } + + protected Metrics metrics() { + return metrics; + } + protected Time time() { + return time; + } + + private void start() { + throttledChannelReaper.start(); + } + + /** + * Reaper thread that triggers channel unmute callbacks on all throttled channels + */ + public class ThrottledChannelReaper extends ShutdownableThread { + private final DelayQueue<ThrottledChannel> delayQueue; + + public ThrottledChannelReaper(DelayQueue<ThrottledChannel> delayQueue, String prefix) { + super(prefix + "ThrottledChannelReaper-" + quotaType, false); + this.delayQueue = delayQueue; + } + + @Override + public void doWork() { + ThrottledChannel throttledChannel; + try { + throttledChannel = delayQueue.poll(1, TimeUnit.SECONDS); + if (throttledChannel != null) { + // Decrement the size of the delay queue + delayQueueSensor.record(-1); + // Notify the socket server that throttling is done for this channel + throttledChannel.notifyThrottlingDone(); + } + } catch (InterruptedException e) { + // Ignore and continue + Thread.currentThread().interrupt(); + } + } + } + + /** + * Returns true if any quotas are enabled for this quota manager. This is used + * to determine if quota related metrics should be created. + * Note: If any quotas (static defaults, dynamic defaults or quota overrides) have + * been configured for this broker at any time for this quota type, quotasEnabled will + * return true until the next broker restart, even if all quotas are subsequently deleted. + */ + public boolean quotasEnabled() { + return quotaTypesEnabled != QuotaTypes.NO_QUOTAS; + } + + /** + * See recordAndGetThrottleTimeMs. + */ + public int maybeRecordAndGetThrottleTimeMs(Session session, String clientId, double value, long timeMs) { + // Record metrics only if quotas are enabled. + if (quotasEnabled()) { + return recordAndGetThrottleTimeMs(session, clientId, value, timeMs); + } else { + return 0; + } + } + + /** + * Records that a user/clientId accumulated or would like to accumulate the provided amount at the + * specified time, returns throttle time in milliseconds. + * + * @param session The session from which the user is extracted + * @param clientId The client id + * @param value The value to accumulate + * @param timeMs The time at which to accumulate the value + * @return The throttle time in milliseconds defines as the time to wait until the average + * rate gets back to the defined quota + */ + public int recordAndGetThrottleTimeMs(Session session, String clientId, double value, long timeMs) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + try { + clientSensors.quotaSensor().record(value, timeMs, true); + return 0; + } catch (QuotaViolationException e) { + int throttleTimeMs = (int) throttleTime(e, timeMs); + if (log.isDebugEnabled()) { + log.debug("Quota violated for sensor ({}). Delay time: ({})", + clientSensors.quotaSensor().name(), throttleTimeMs); + } + return throttleTimeMs; + } + } + + /** + * Records that a user/clientId changed some metric being throttled without checking for + * quota violation. The aggregate value will subsequently be used for throttling when the + * next request is processed. + */ + public void recordNoThrottle(Session session, String clientId, double value) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + clientSensors.quotaSensor().record(value, time.milliseconds(), false); + } + + /** + * "Unrecord" the given value that has already been recorded for the given user/client by recording a negative value + * of the same quantity. + * + * For a throttled fetch, the broker should return an empty response and thus should not record the value. Ideally, + * we would like to compute the throttle time before actually recording the value, but the current Sensor code + * couples value recording and quota checking very tightly. As a workaround, we will unrecord the value for the fetch + * in case of throttling. Rate keeps the sum of values that fall in each time window, so this should bring the + * overall sum back to the previous value. + */ + public void unrecordQuotaSensor(Session session, String clientId, double value, long timeMs) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + clientSensors.quotaSensor().record(value * -1, timeMs, false); + } + + /** + * Returns maximum value that could be recorded without guaranteed throttling. + * Recording any larger value will always be throttled, even if no other values were recorded in the quota window. + * This is used for deciding the maximum bytes that can be fetched at once + */ + public double getMaxValueInQuotaWindow(Session session, String clientId) { + if (quotasEnabled()) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + Double limit = quotaCallback.quotaLimit(clientQuotaType, clientSensors.metricTags()); + if (limit != null) { + return limit * (config.numQuotaSamples - 1) * config.quotaWindowSizeSeconds; + } else { + return Double.MAX_VALUE; + } + } else { + return Double.MAX_VALUE; + } + } + + /** + * Throttle a client by muting the associated channel for the given throttle time. + * + * @param request client request + * @param throttleTimeMs Duration in milliseconds for which the channel is to be muted. + * @param throttleCallback Callback for channel throttling Review Comment: Please fix the JavaDoc for `@param` tag -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org