YutaLin commented on code in PR #19807: URL: https://github.com/apache/kafka/pull/19807#discussion_r2115174151
########## server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java: ########## @@ -0,0 +1,942 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Sanitizer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.server.quota.QuotaUtils; +import org.apache.kafka.server.quota.SensorAccess; +import org.apache.kafka.server.quota.ThrottleCallback; +import org.apache.kafka.server.quota.ThrottledChannel; +import org.apache.kafka.server.util.ShutdownableThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +final class QuotaTypes { + static final int NO_QUOTAS = 0; + static final int CLIENT_ID_QUOTA_ENABLED = 1; + static final int USER_QUOTA_ENABLED = 2; + static final int USER_CLIENT_ID_QUOTA_ENABLED = 4; + static final int CUSTOM_QUOTAS = 8; // No metric update optimizations are used with custom quotas +} + +public class ClientQuotaManager { + + private static final Logger log = LoggerFactory.getLogger(ClientQuotaManager.class); + + // Purge sensors after 1 hour of inactivity + public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + private static final String DEFAULT_NAME = "<default>"; + + public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE); + public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null); + public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, DefaultClientIdEntity.INSTANCE); + + public interface BaseUserEntity extends ClientQuotaEntity.ConfigEntity { } + + public static class UserEntity implements BaseUserEntity { + private final String sanitizedUser; + + public UserEntity(String sanitizedUser) { + this.sanitizedUser = sanitizedUser; + } + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.USER; + } + + @Override + public String name() { + return Sanitizer.desanitize(sanitizedUser); + } + + public String getSanitizedUser() { + return sanitizedUser; + } + + @Override + public String toString() { + return "user " + sanitizedUser; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + UserEntity that = (UserEntity) obj; + return sanitizedUser.equals(that.sanitizedUser); + } + + @Override + public int hashCode() { + return sanitizedUser.hashCode(); + } + } + + public static class ClientIdEntity implements ClientQuotaEntity.ConfigEntity { + private final String clientId; + + public ClientIdEntity(String clientId) { + this.clientId = clientId; + } + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.CLIENT_ID; + } + + @Override + public String name() { + return clientId; + } + + public String getClientId() { + return clientId; + } + + @Override + public String toString() { + return "client-id " + clientId; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + ClientIdEntity that = (ClientIdEntity) obj; + return clientId.equals(that.clientId); + } + + @Override + public int hashCode() { + return clientId.hashCode(); + } + } + + public static class DefaultUserEntity implements BaseUserEntity { + public static final DefaultUserEntity INSTANCE = new DefaultUserEntity(); + + private DefaultUserEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default user"; + } + } + + public static class DefaultClientIdEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultClientIdEntity INSTANCE = new DefaultClientIdEntity(); + + private DefaultClientIdEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default client-id"; + } + } + + public static class KafkaQuotaEntity implements ClientQuotaEntity { + private final BaseUserEntity userEntity; + private final ClientQuotaEntity.ConfigEntity clientIdEntity; + + public KafkaQuotaEntity(BaseUserEntity userEntity, ClientQuotaEntity.ConfigEntity clientIdEntity) { + this.userEntity = userEntity; + this.clientIdEntity = clientIdEntity; + } + + @Override + public List<ConfigEntity> configEntities() { + List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>(); + if (userEntity != null) { + entities.add(userEntity); + } + if (clientIdEntity != null) { + entities.add(clientIdEntity); + } + return entities; + } + + public String sanitizedUser() { + if (userEntity instanceof UserEntity) { + return ((UserEntity) userEntity).getSanitizedUser(); + } else if (userEntity == DefaultUserEntity.INSTANCE) { + return DEFAULT_NAME; + } + return ""; + } + + public String clientId() { + return clientIdEntity != null ? clientIdEntity.name() : ""; + } + + @Override + public String toString() { + String user = userEntity != null ? userEntity.toString() : ""; + String clientId = clientIdEntity != null ? clientIdEntity.toString() : ""; + return (user + " " + clientId).trim(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + KafkaQuotaEntity that = (KafkaQuotaEntity) obj; + return java.util.Objects.equals(userEntity, that.userEntity) && + java.util.Objects.equals(clientIdEntity, that.clientIdEntity); + } + + @Override + public int hashCode() { + return java.util.Objects.hash(userEntity, clientIdEntity); + } + } + + public static class DefaultTags { + public static final String USER = "user"; + public static final String CLIENT_ID = "client-id"; + } + + private final ClientQuotaManagerConfig config; + protected final Metrics metrics; + private final QuotaType quotaType; + protected final Time time; + private final Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin; + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final SensorAccess sensorAccessor; + private final ClientQuotaCallback quotaCallback; + private final ClientQuotaType clientQuotaType; + + private volatile int quotaTypesEnabled; + + private final Sensor delayQueueSensor; + private final DelayQueue<ThrottledChannel> delayQueue = new DelayQueue<>(); + private final ThrottledChannelReaper throttledChannelReaper; + + public void processThrottledChannelReaperDoWork() { + throttledChannelReaper.doWork(); + } + + /** + * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics + * for all clients. + * <p/> + * Quotas can be set at <user, client-id>, user or client-id levels. For a given client connection, + * the most specific quota matching the connection will be applied. For example, if both a <user, client-id> + * and a user quota match a connection, the <user, client-id> quota will be used. Otherwise, user quota takes + * precedence over client-id quota. The order of precedence is: + * <ul> + * <li>/config/users/<user>/clients/<client-id> + * <li>/config/users/<user>/clients/<default> + * <li>/config/users/<user> + * <li>/config/users/<default>/clients/<client-id> + * <li>/config/users/<default>/clients/<default> + * <li>/config/users/<default> + * <li>/config/clients/<client-id> + * <li>/config/clients/<default> + * </ul> + * Quota limits including defaults may be updated dynamically. The implementation is optimized for the case + * where a single level of quotas is configured. + * @param config the ClientQuotaManagerConfig containing quota configurations + * @param metrics the Metrics instance for recording quota-related metrics + * @param quotaType the quota type managed by this quota manager + * @param time the Time object used for time-based operations + * @param threadNamePrefix the thread name prefix used for internal threads + * @param clientQuotaCallbackPlugin optional Plugin containing a ClientQuotaCallback for custom quota logic + */ + public ClientQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + QuotaType quotaType, + Time time, + String threadNamePrefix, + Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin) { + this.config = config; + this.metrics = metrics; + this.quotaType = quotaType; + this.time = time; + this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin; + + this.sensorAccessor = new SensorAccess(lock, metrics); + this.clientQuotaType = QuotaType.toClientQuotaType(quotaType); + + this.quotaTypesEnabled = clientQuotaCallbackPlugin.isPresent() ? + QuotaTypes.CUSTOM_QUOTAS : QuotaTypes.NO_QUOTAS; + + this.delayQueueSensor = metrics.sensor(quotaType.toString() + "-delayQueue"); + this.delayQueueSensor.add(metrics.metricName("queue-size", quotaType.toString(), + "Tracks the size of the delay queue"), new CumulativeSum()); + this.throttledChannelReaper = new ThrottledChannelReaper(delayQueue, threadNamePrefix); + + this.quotaCallback = clientQuotaCallbackPlugin + .map(Plugin::get) + .orElse(new DefaultQuotaCallback()); + + start(); // Use start method to keep spotbugs happy + } + + public ClientQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + QuotaType quotaType, + Time time, + String threadNamePrefix) { + this(config, metrics, quotaType, time, threadNamePrefix, Optional.empty()); + } + + protected Metrics metrics() { + return metrics; + } + protected Time time() { + return time; + } + + private void start() { + throttledChannelReaper.start(); + } + + /** + * Reaper thread that triggers channel unmute callbacks on all throttled channels + */ + public class ThrottledChannelReaper extends ShutdownableThread { + private final DelayQueue<ThrottledChannel> delayQueue; + + public ThrottledChannelReaper(DelayQueue<ThrottledChannel> delayQueue, String prefix) { + super(prefix + "ThrottledChannelReaper-" + quotaType, false); + this.delayQueue = delayQueue; + } + + @Override + public void doWork() { + ThrottledChannel throttledChannel; + try { + throttledChannel = delayQueue.poll(1, TimeUnit.SECONDS); + if (throttledChannel != null) { + // Decrement the size of the delay queue + delayQueueSensor.record(-1); + // Notify the socket server that throttling is done for this channel + throttledChannel.notifyThrottlingDone(); + } + } catch (InterruptedException e) { + // Ignore and continue + Thread.currentThread().interrupt(); + } + } + } + + /** + * Returns true if any quotas are enabled for this quota manager. This is used + * to determine if quota related metrics should be created. + * Note: If any quotas (static defaults, dynamic defaults or quota overrides) have + * been configured for this broker at any time for this quota type, quotasEnabled will + * return true until the next broker restart, even if all quotas are subsequently deleted. + */ + public boolean quotasEnabled() { + return quotaTypesEnabled != QuotaTypes.NO_QUOTAS; + } + + /** + * See recordAndGetThrottleTimeMs. + */ + public int maybeRecordAndGetThrottleTimeMs(Session session, String clientId, double value, long timeMs) { + // Record metrics only if quotas are enabled. + if (quotasEnabled()) { + return recordAndGetThrottleTimeMs(session, clientId, value, timeMs); + } else { + return 0; + } + } + + /** + * Records that a user/clientId accumulated or would like to accumulate the provided amount at the + * specified time, returns throttle time in milliseconds. + * + * @param session The session from which the user is extracted + * @param clientId The client id + * @param value The value to accumulate + * @param timeMs The time at which to accumulate the value + * @return The throttle time in milliseconds defines as the time to wait until the average + * rate gets back to the defined quota + */ + public int recordAndGetThrottleTimeMs(Session session, String clientId, double value, long timeMs) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + try { + clientSensors.quotaSensor().record(value, timeMs, true); + return 0; + } catch (QuotaViolationException e) { + int throttleTimeMs = (int) throttleTime(e, timeMs); + if (log.isDebugEnabled()) { + log.debug("Quota violated for sensor ({}). Delay time: ({})", + clientSensors.quotaSensor().name(), throttleTimeMs); + } + return throttleTimeMs; + } + } + + /** + * Records that a user/clientId changed some metric being throttled without checking for + * quota violation. The aggregate value will subsequently be used for throttling when the + * next request is processed. + */ + public void recordNoThrottle(Session session, String clientId, double value) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + clientSensors.quotaSensor().record(value, time.milliseconds(), false); + } + + /** + * "Unrecord" the given value that has already been recorded for the given user/client by recording a negative value + * of the same quantity. + * For a throttled fetch, the broker should return an empty response and thus should not record the value. Ideally, + * we would like to compute the throttle time before actually recording the value, but the current Sensor code + * couples value recording and quota checking very tightly. As a workaround, we will unrecord the value for the fetch + * in case of throttling. Rate keeps the sum of values that fall in each time window, so this should bring the + * overall sum back to the previous value. + */ + public void unrecordQuotaSensor(Session session, String clientId, double value, long timeMs) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + clientSensors.quotaSensor().record(value * -1, timeMs, false); + } + + /** + * Returns maximum value that could be recorded without guaranteed throttling. + * Recording any larger value will always be throttled, even if no other values were recorded in the quota window. + * This is used for deciding the maximum bytes that can be fetched at once + */ + public double getMaxValueInQuotaWindow(Session session, String clientId) { + if (quotasEnabled()) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + Double limit = quotaCallback.quotaLimit(clientQuotaType, clientSensors.metricTags()); + if (limit != null) { + return limit * (config.numQuotaSamples - 1) * config.quotaWindowSizeSeconds; + } else { + return Double.MAX_VALUE; + } + } else { + return Double.MAX_VALUE; + } + } + + /** + * Throttle a client by muting the associated channel for the given throttle time. + * @param clientId request client id + * @param session request session + * @param throttleTimeMs Duration in milliseconds for which the channel is to be muted. + * @param throttleCallback Callback for channel throttling + */ + public void throttle( + String clientId, + Session session, + ThrottleCallback throttleCallback, + int throttleTimeMs + ) { + if (throttleTimeMs > 0) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + clientSensors.throttleTimeSensor().record(throttleTimeMs); + ThrottledChannel throttledChannel = new ThrottledChannel(time, throttleTimeMs, throttleCallback); + delayQueue.add(throttledChannel); + delayQueueSensor.record(); + if (log.isDebugEnabled()) { + log.debug("Channel throttled for sensor ({}). Delay time: ({})", + clientSensors.quotaSensor().name(), throttleTimeMs); + } + } + } + + /** + * Returns the quota for the client with the specified (non-encoded) user principal and client-id. + * Note: this method is expensive, it is meant to be used by tests only + */ + public Quota quota(String user, String clientId) { + KafkaPrincipal userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user); + return quota(userPrincipal, clientId); + } + + /** + * Returns the quota for the client with the specified user principal and client-id. + * Note: this method is expensive, it is meant to be used by tests only + */ + public Quota quota(KafkaPrincipal userPrincipal, String clientId) { + Map<String, String> metricTags = quotaCallback.quotaMetricTags(clientQuotaType, userPrincipal, clientId); + return Quota.upperBound(quotaLimit(metricTags)); + } + + private double quotaLimit(Map<String, String> metricTags) { + Double limit = quotaCallback.quotaLimit(clientQuotaType, metricTags); + return limit != null ? limit : Long.MAX_VALUE; + } + + /** + * This calculates the amount of time needed to bring the metric within quota + * assuming that no new metrics are recorded. + */ + protected long throttleTime(QuotaViolationException e, long timeMs) { + return QuotaUtils.throttleTime(e, timeMs); + } + + + /** + * This function either returns the sensors for a given client id or creates them if they don't exist + */ + public ClientSensors getOrCreateQuotaSensors(Session session, String clientId) { + Map<String, String> metricTags = Map.copyOf( + quotaCallback instanceof DefaultQuotaCallback defaultCallback + ? defaultCallback.quotaMetricTags(session.sanitizedUser, clientId) + : quotaCallback.quotaMetricTags(clientQuotaType, session.principal, clientId)); + ClientSensors sensors = new ClientSensors( + metricTags, + sensorAccessor.getOrCreate( + getQuotaSensorName(metricTags), + INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS, + sensor -> registerQuotaMetrics(metricTags, sensor) // quotaLimit() called here only for new sensors + ), + sensorAccessor.getOrCreate( + getThrottleTimeSensorName(metricTags), + INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS, + sensor -> sensor.add(throttleMetricName(metricTags), new Avg()) + ) + ); + + if (quotaCallback.quotaResetRequired(clientQuotaType)) { + updateQuotaMetricConfigs(); + } + + return sensors; + } + + protected void registerQuotaMetrics(Map<String, String> metricTags, Sensor sensor) { + sensor.add( + clientQuotaMetricName(metricTags), + new Rate(), + getQuotaMetricConfig(metricTags) + ); + } + + private String metricTagsToSensorSuffix(Map<String, String> metricTags) { + if (metricTags.isEmpty()) { + return ""; + } + + // Handle GroupedUserQuotaCallback case + if (metricTags.containsKey("group")) { Review Comment: @chia7712 Yes, i added all the possible tests when I encountered failing tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org