chia7712 commented on code in PR #19807: URL: https://github.com/apache/kafka/pull/19807#discussion_r2129670800
########## server/src/main/java/org/apache/kafka/server/PermissiveControllerMutationQuota.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Time; + +import java.util.Objects; + +/** + * The PermissiveControllerMutationQuota defines a permissive quota for a given user/clientId pair. + * The quota is permissive meaning that 1) it does accept any mutations even if the quota is + * exhausted; and 2) it does throttle as soon as the quota is exhausted. + */ +public class PermissiveControllerMutationQuota extends AbstractControllerMutationQuota { + private final Sensor quotaSensor; + + /** + * Creates a new PermissiveControllerMutationQuota with the specified time source and quota sensor. + * + * @param time the Time object used for time-based calculations and quota tracking + * @param quotaSensor the Sensor object that tracks quota usage for a specific user/clientId pair + * @throws IllegalArgumentException if time or quotaSensor is null + */ + public PermissiveControllerMutationQuota(Time time, Sensor quotaSensor) { + super(Objects.requireNonNull(time, "time cannot be null")); Review Comment: could you please move the null check to `AbstractControllerMutationQuota`? ########## server/src/main/java/org/apache/kafka/server/ControllerMutationQuotaManager.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.TokenBucket; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.QuotaType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Optional; + + +/** + * The ControllerMutationQuotaManager is a specialized ClientQuotaManager used in the context + * of throttling controller's operations/mutations. + */ +public class ControllerMutationQuotaManager extends ClientQuotaManager { + + private static final Logger log = LoggerFactory.getLogger(ControllerMutationQuotaManager.class); + + /** + * @param config ClientQuotaManagerConfig quota configs + * @param metrics Metrics instance + * @param time Time object to use + * @param threadNamePrefix The thread prefix to use + * @param quotaCallback ClientQuotaCallback to use + */ + public ControllerMutationQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + Time time, + String threadNamePrefix, + Optional<Plugin<ClientQuotaCallback>> quotaCallback) { + super(config, metrics, QuotaType.CONTROLLER_MUTATION, time, threadNamePrefix, quotaCallback); + } + + @Override + protected MetricName clientQuotaMetricName(Map<String, String> quotaMetricTags) { + return metrics.metricName("tokens", QuotaType.CONTROLLER_MUTATION.toString(), + "Tracking remaining tokens in the token bucket per user/client-id", + quotaMetricTags); + } + + private MetricName clientRateMetricName(Map<String, String> quotaMetricTags) { + return metrics.metricName("mutation-rate", QuotaType.CONTROLLER_MUTATION.toString(), + "Tracking mutation-rate per user/client-id", + quotaMetricTags); + } + + @Override + protected void registerQuotaMetrics(Map<String, String> metricTags, Sensor sensor) { + sensor.add( + clientRateMetricName(metricTags), + new Rate() + ); + sensor.add( + clientQuotaMetricName(metricTags), + new TokenBucket(), + getQuotaMetricConfig(metricTags) + ); + } + + /** + * Records that a user/clientId accumulated or would like to accumulate the provided amount at the + * specified time, returns throttle time in milliseconds. The quota is strict, meaning that it + * does not accept any mutations once the quota is exhausted until it gets back to the defined rate. + * + * @param session The session from which the user is extracted + * @param clientId The client id + * @param value The value to accumulate + * @param timeMs The time at which to accumulate the value + * @return The throttle time in milliseconds defines as the time to wait until the average + * rate gets back to the defined quota + */ + @Override + public int recordAndGetThrottleTimeMs(Session session, String clientId, double value, long timeMs) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + Sensor quotaSensor = clientSensors.quotaSensor(); + + try { + synchronized (quotaSensor) { + quotaSensor.checkQuotas(timeMs); + quotaSensor.record(value, timeMs, false); + } + return 0; + } catch (QuotaViolationException e) { + int throttleTimeMs = (int) throttleTimeMs(e); + if (log.isDebugEnabled()) { + log.debug("Quota violated for sensor ({}). Delay time: ({})", + quotaSensor.name(), throttleTimeMs); + } + return throttleTimeMs; + } + } + + /** + * Returns a StrictControllerMutationQuota for the given user/clientId pair or + * a UNBOUNDED_CONTROLLER_MUTATION_QUOTA if the quota is disabled. + * + * @param session The session from which the user is extracted + * @param clientId The client id + * @return ControllerMutationQuota + */ + public ControllerMutationQuota newStrictQuotaFor(Session session, String clientId) { + if (quotasEnabled()) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + return new StrictControllerMutationQuota(time, clientSensors.quotaSensor()); + } else { + return ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA; + } + } + + public ControllerMutationQuota newStrictQuotaFor(Session session, RequestHeader header) { + return newStrictQuotaFor(session, header.clientId()); + } + + /** + * Returns a PermissiveControllerMutationQuota for the given user/clientId pair or + * a UNBOUNDED_CONTROLLER_MUTATION_QUOTA if the quota is disabled. + * + * @param session The session from which the user is extracted + * @param clientId The client id + * @return ControllerMutationQuota + */ + public ControllerMutationQuota newPermissiveQuotaFor(Session session, String clientId) { + if (quotasEnabled()) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + return new PermissiveControllerMutationQuota(time, clientSensors.quotaSensor()); + } else { + return ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA; + } + } + + /** + * Returns a ControllerMutationQuota based on `strictSinceVersion`. It returns a strict + * quota if the version is equal to or above of the `strictSinceVersion`, a permissive + * quota if the version is below, and an unbounded quota if the quota is disabled. + * When the quota is strictly enforced. Any operation above the quota is not allowed + * and rejected with a THROTTLING_QUOTA_EXCEEDED error. + * + * @param session The session from which the user is extracted + * @param header The request header to extract the clientId and apiVersion from + * @param strictSinceVersion The version since quota is strict + * @return ControllerMutationQuota instance + */ + public ControllerMutationQuota newQuotaFor(Session session, RequestHeader header, short strictSinceVersion) { + if (header.apiVersion() >= strictSinceVersion) { + return newStrictQuotaFor(session, header); + } else { + return newPermissiveQuotaFor(session, header.clientId()); + } + } + + /** + * This calculates the amount of time needed to bring the TokenBucket within quota + * assuming that no new metrics are recorded. + * Basically, if a value < 0 is observed, the time required to bring it to zero is + * -value/ refill rate (quota bound) * 1000. + */ + public static long throttleTimeMs(QuotaViolationException e) { + if (e.metric().measurable() instanceof TokenBucket) { Review Comment: ```java if (e.metric().measurable() instanceof TokenBucket) { return Math.round(-e.value() / e.bound() * 1000); } throw new IllegalArgumentException( "Metric " + e.metric().metricName() + " is not a TokenBucket metric, value " + e.metric().measurable()); ``` ########## server/src/main/java/org/apache/kafka/server/ControllerMutationQuotaManager.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.TokenBucket; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.QuotaType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Optional; + + +/** + * The ControllerMutationQuotaManager is a specialized ClientQuotaManager used in the context + * of throttling controller's operations/mutations. + */ +public class ControllerMutationQuotaManager extends ClientQuotaManager { + + private static final Logger log = LoggerFactory.getLogger(ControllerMutationQuotaManager.class); + + /** + * @param config ClientQuotaManagerConfig quota configs + * @param metrics Metrics instance + * @param time Time object to use + * @param threadNamePrefix The thread prefix to use + * @param quotaCallback ClientQuotaCallback to use + */ + public ControllerMutationQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + Time time, + String threadNamePrefix, + Optional<Plugin<ClientQuotaCallback>> quotaCallback) { + super(config, metrics, QuotaType.CONTROLLER_MUTATION, time, threadNamePrefix, quotaCallback); + } + + @Override + protected MetricName clientQuotaMetricName(Map<String, String> quotaMetricTags) { + return metrics.metricName("tokens", QuotaType.CONTROLLER_MUTATION.toString(), + "Tracking remaining tokens in the token bucket per user/client-id", + quotaMetricTags); + } + + private MetricName clientRateMetricName(Map<String, String> quotaMetricTags) { + return metrics.metricName("mutation-rate", QuotaType.CONTROLLER_MUTATION.toString(), + "Tracking mutation-rate per user/client-id", + quotaMetricTags); + } + + @Override + protected void registerQuotaMetrics(Map<String, String> metricTags, Sensor sensor) { + sensor.add( + clientRateMetricName(metricTags), + new Rate() + ); + sensor.add( + clientQuotaMetricName(metricTags), + new TokenBucket(), + getQuotaMetricConfig(metricTags) + ); + } + + /** + * Records that a user/clientId accumulated or would like to accumulate the provided amount at the + * specified time, returns throttle time in milliseconds. The quota is strict, meaning that it + * does not accept any mutations once the quota is exhausted until it gets back to the defined rate. + * + * @param session The session from which the user is extracted + * @param clientId The client id + * @param value The value to accumulate + * @param timeMs The time at which to accumulate the value + * @return The throttle time in milliseconds defines as the time to wait until the average + * rate gets back to the defined quota + */ + @Override + public int recordAndGetThrottleTimeMs(Session session, String clientId, double value, long timeMs) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + Sensor quotaSensor = clientSensors.quotaSensor(); + + try { + synchronized (quotaSensor) { + quotaSensor.checkQuotas(timeMs); + quotaSensor.record(value, timeMs, false); + } + return 0; + } catch (QuotaViolationException e) { + int throttleTimeMs = (int) throttleTimeMs(e); + if (log.isDebugEnabled()) { + log.debug("Quota violated for sensor ({}). Delay time: ({})", + quotaSensor.name(), throttleTimeMs); + } + return throttleTimeMs; + } + } + + /** + * Returns a StrictControllerMutationQuota for the given user/clientId pair or + * a UNBOUNDED_CONTROLLER_MUTATION_QUOTA if the quota is disabled. + * + * @param session The session from which the user is extracted + * @param clientId The client id + * @return ControllerMutationQuota + */ + public ControllerMutationQuota newStrictQuotaFor(Session session, String clientId) { + if (quotasEnabled()) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + return new StrictControllerMutationQuota(time, clientSensors.quotaSensor()); + } else { + return ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA; + } + } + + public ControllerMutationQuota newStrictQuotaFor(Session session, RequestHeader header) { + return newStrictQuotaFor(session, header.clientId()); + } + + /** + * Returns a PermissiveControllerMutationQuota for the given user/clientId pair or + * a UNBOUNDED_CONTROLLER_MUTATION_QUOTA if the quota is disabled. + * + * @param session The session from which the user is extracted + * @param clientId The client id + * @return ControllerMutationQuota + */ + public ControllerMutationQuota newPermissiveQuotaFor(Session session, String clientId) { + if (quotasEnabled()) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + return new PermissiveControllerMutationQuota(time, clientSensors.quotaSensor()); + } else { + return ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA; + } + } + + /** + * Returns a ControllerMutationQuota based on `strictSinceVersion`. It returns a strict + * quota if the version is equal to or above of the `strictSinceVersion`, a permissive + * quota if the version is below, and an unbounded quota if the quota is disabled. + * When the quota is strictly enforced. Any operation above the quota is not allowed + * and rejected with a THROTTLING_QUOTA_EXCEEDED error. + * + * @param session The session from which the user is extracted + * @param header The request header to extract the clientId and apiVersion from + * @param strictSinceVersion The version since quota is strict + * @return ControllerMutationQuota instance + */ + public ControllerMutationQuota newQuotaFor(Session session, RequestHeader header, short strictSinceVersion) { + if (header.apiVersion() >= strictSinceVersion) { + return newStrictQuotaFor(session, header); + } else { + return newPermissiveQuotaFor(session, header.clientId()); + } + } + + /** + * This calculates the amount of time needed to bring the TokenBucket within quota + * assuming that no new metrics are recorded. + * Basically, if a value < 0 is observed, the time required to bring it to zero is + * -value/ refill rate (quota bound) * 1000. + */ + public static long throttleTimeMs(QuotaViolationException e) { Review Comment: it could be package-private ########## server/src/main/java/org/apache/kafka/server/ControllerMutationQuotaManager.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.TokenBucket; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.QuotaType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Optional; + + +/** + * The ControllerMutationQuotaManager is a specialized ClientQuotaManager used in the context + * of throttling controller's operations/mutations. + */ +public class ControllerMutationQuotaManager extends ClientQuotaManager { + + private static final Logger log = LoggerFactory.getLogger(ControllerMutationQuotaManager.class); + + /** + * @param config ClientQuotaManagerConfig quota configs + * @param metrics Metrics instance + * @param time Time object to use + * @param threadNamePrefix The thread prefix to use + * @param quotaCallback ClientQuotaCallback to use + */ + public ControllerMutationQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + Time time, + String threadNamePrefix, + Optional<Plugin<ClientQuotaCallback>> quotaCallback) { + super(config, metrics, QuotaType.CONTROLLER_MUTATION, time, threadNamePrefix, quotaCallback); + } + + @Override + protected MetricName clientQuotaMetricName(Map<String, String> quotaMetricTags) { + return metrics.metricName("tokens", QuotaType.CONTROLLER_MUTATION.toString(), + "Tracking remaining tokens in the token bucket per user/client-id", + quotaMetricTags); + } + + private MetricName clientRateMetricName(Map<String, String> quotaMetricTags) { + return metrics.metricName("mutation-rate", QuotaType.CONTROLLER_MUTATION.toString(), + "Tracking mutation-rate per user/client-id", + quotaMetricTags); + } + + @Override + protected void registerQuotaMetrics(Map<String, String> metricTags, Sensor sensor) { + sensor.add( + clientRateMetricName(metricTags), + new Rate() + ); + sensor.add( + clientQuotaMetricName(metricTags), + new TokenBucket(), + getQuotaMetricConfig(metricTags) + ); + } + + /** + * Records that a user/clientId accumulated or would like to accumulate the provided amount at the + * specified time, returns throttle time in milliseconds. The quota is strict, meaning that it + * does not accept any mutations once the quota is exhausted until it gets back to the defined rate. + * + * @param session The session from which the user is extracted + * @param clientId The client id + * @param value The value to accumulate + * @param timeMs The time at which to accumulate the value + * @return The throttle time in milliseconds defines as the time to wait until the average + * rate gets back to the defined quota + */ + @Override + public int recordAndGetThrottleTimeMs(Session session, String clientId, double value, long timeMs) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + Sensor quotaSensor = clientSensors.quotaSensor(); + + try { + synchronized (quotaSensor) { + quotaSensor.checkQuotas(timeMs); + quotaSensor.record(value, timeMs, false); + } + return 0; + } catch (QuotaViolationException e) { + int throttleTimeMs = (int) throttleTimeMs(e); + if (log.isDebugEnabled()) { + log.debug("Quota violated for sensor ({}). Delay time: ({})", + quotaSensor.name(), throttleTimeMs); + } + return throttleTimeMs; + } + } + + /** + * Returns a StrictControllerMutationQuota for the given user/clientId pair or + * a UNBOUNDED_CONTROLLER_MUTATION_QUOTA if the quota is disabled. + * + * @param session The session from which the user is extracted + * @param clientId The client id + * @return ControllerMutationQuota + */ + public ControllerMutationQuota newStrictQuotaFor(Session session, String clientId) { + if (quotasEnabled()) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + return new StrictControllerMutationQuota(time, clientSensors.quotaSensor()); + } else { + return ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA; + } + } + + public ControllerMutationQuota newStrictQuotaFor(Session session, RequestHeader header) { + return newStrictQuotaFor(session, header.clientId()); + } + + /** + * Returns a PermissiveControllerMutationQuota for the given user/clientId pair or + * a UNBOUNDED_CONTROLLER_MUTATION_QUOTA if the quota is disabled. + * + * @param session The session from which the user is extracted + * @param clientId The client id + * @return ControllerMutationQuota + */ + public ControllerMutationQuota newPermissiveQuotaFor(Session session, String clientId) { + if (quotasEnabled()) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + return new PermissiveControllerMutationQuota(time, clientSensors.quotaSensor()); + } else { + return ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA; + } + } + + /** + * Returns a ControllerMutationQuota based on `strictSinceVersion`. It returns a strict + * quota if the version is equal to or above of the `strictSinceVersion`, a permissive + * quota if the version is below, and an unbounded quota if the quota is disabled. + * When the quota is strictly enforced. Any operation above the quota is not allowed + * and rejected with a THROTTLING_QUOTA_EXCEEDED error. + * + * @param session The session from which the user is extracted + * @param header The request header to extract the clientId and apiVersion from + * @param strictSinceVersion The version since quota is strict + * @return ControllerMutationQuota instance + */ + public ControllerMutationQuota newQuotaFor(Session session, RequestHeader header, short strictSinceVersion) { + if (header.apiVersion() >= strictSinceVersion) { Review Comment: ```java if (header.apiVersion() >= strictSinceVersion) return newStrictQuotaFor(session, header); return newPermissiveQuotaFor(session, header.clientId()); ``` ########## server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java: ########## @@ -0,0 +1,883 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Sanitizer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.server.quota.QuotaUtils; +import org.apache.kafka.server.quota.SensorAccess; +import org.apache.kafka.server.quota.ThrottleCallback; +import org.apache.kafka.server.quota.ThrottledChannel; +import org.apache.kafka.server.util.ShutdownableThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +public class ClientQuotaManager { + + static final class QuotaTypes { + static final int NO_QUOTAS = 0; + static final int CLIENT_ID_QUOTA_ENABLED = 1; + static final int USER_QUOTA_ENABLED = 2; + static final int USER_CLIENT_ID_QUOTA_ENABLED = 4; + static final int CUSTOM_QUOTAS = 8; // No metric update optimizations are used with custom quotas + } + + private static final Logger LOG = LoggerFactory.getLogger(ClientQuotaManager.class); + + // Purge sensors after 1 hour of inactivity + private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + private static final String DEFAULT_NAME = "<default>"; + + public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE); + public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null); + public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, DefaultClientIdEntity.INSTANCE); + + public record UserEntity(String sanitizedUser) implements ClientQuotaEntity.ConfigEntity { + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.USER; + } + + @Override + public String name() { + return Sanitizer.desanitize(sanitizedUser); + } + + @Override + public String toString() { + return "user " + sanitizedUser; + } + } + + // Convert to record - this is a simple data holder + public record ClientIdEntity(String clientId) implements ClientQuotaEntity.ConfigEntity { + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.CLIENT_ID; + } + + @Override + public String name() { + return clientId; + } + + @Override + public String toString() { + return "client-id " + clientId; + } + } + + // Keep as a class-uses a singleton pattern which doesn't work well with records + public static class DefaultUserEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultUserEntity INSTANCE = new DefaultUserEntity(); + + private DefaultUserEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default user"; + } + } + + public static class DefaultClientIdEntity implements ClientQuotaEntity.ConfigEntity { Review Comment: ditto ########## server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java: ########## @@ -0,0 +1,883 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Sanitizer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.server.quota.QuotaUtils; +import org.apache.kafka.server.quota.SensorAccess; +import org.apache.kafka.server.quota.ThrottleCallback; +import org.apache.kafka.server.quota.ThrottledChannel; +import org.apache.kafka.server.util.ShutdownableThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +public class ClientQuotaManager { + + static final class QuotaTypes { Review Comment: Do we really need a struct class? ########## server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java: ########## @@ -0,0 +1,883 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Sanitizer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.server.quota.QuotaUtils; +import org.apache.kafka.server.quota.SensorAccess; +import org.apache.kafka.server.quota.ThrottleCallback; +import org.apache.kafka.server.quota.ThrottledChannel; +import org.apache.kafka.server.util.ShutdownableThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +public class ClientQuotaManager { + + static final class QuotaTypes { + static final int NO_QUOTAS = 0; + static final int CLIENT_ID_QUOTA_ENABLED = 1; + static final int USER_QUOTA_ENABLED = 2; + static final int USER_CLIENT_ID_QUOTA_ENABLED = 4; + static final int CUSTOM_QUOTAS = 8; // No metric update optimizations are used with custom quotas + } + + private static final Logger LOG = LoggerFactory.getLogger(ClientQuotaManager.class); + + // Purge sensors after 1 hour of inactivity + private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + private static final String DEFAULT_NAME = "<default>"; + + public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE); + public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null); + public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, DefaultClientIdEntity.INSTANCE); + + public record UserEntity(String sanitizedUser) implements ClientQuotaEntity.ConfigEntity { + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.USER; + } + + @Override + public String name() { + return Sanitizer.desanitize(sanitizedUser); + } + + @Override + public String toString() { + return "user " + sanitizedUser; + } + } + + // Convert to record - this is a simple data holder + public record ClientIdEntity(String clientId) implements ClientQuotaEntity.ConfigEntity { + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.CLIENT_ID; + } + + @Override + public String name() { + return clientId; + } + + @Override + public String toString() { + return "client-id " + clientId; + } + } + + // Keep as a class-uses a singleton pattern which doesn't work well with records + public static class DefaultUserEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultUserEntity INSTANCE = new DefaultUserEntity(); + + private DefaultUserEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default user"; + } + } + + public static class DefaultClientIdEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultClientIdEntity INSTANCE = new DefaultClientIdEntity(); + + private DefaultClientIdEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default client-id"; + } + } + + public record KafkaQuotaEntity(ClientQuotaEntity.ConfigEntity userEntity, + ClientQuotaEntity.ConfigEntity clientIdEntity) implements ClientQuotaEntity { + + @Override + public List<ConfigEntity> configEntities() { + List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>(); + if (userEntity != null) { + entities.add(userEntity); + } + if (clientIdEntity != null) { + entities.add(clientIdEntity); + } + return entities; + } + + public String sanitizedUser() { + if (userEntity instanceof UserEntity userRecord) { + return userRecord.sanitizedUser(); + } else if (userEntity == DefaultUserEntity.INSTANCE) { + return DEFAULT_NAME; + } + return ""; + } + + public String clientId() { + return clientIdEntity != null ? clientIdEntity.name() : ""; + } + + @Override + public String toString() { + String user = userEntity != null ? userEntity.toString() : ""; + String clientId = clientIdEntity != null ? clientIdEntity.toString() : ""; + return (user + " " + clientId).trim(); + } + } + + public static class DefaultTags { + public static final String USER = "user"; + public static final String CLIENT_ID = "client-id"; + } + + private final ClientQuotaManagerConfig config; + protected final Metrics metrics; + private final QuotaType quotaType; + protected final Time time; + private final Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin; + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final SensorAccess sensorAccessor; + private final ClientQuotaCallback quotaCallback; + private final ClientQuotaType clientQuotaType; + + private volatile int quotaTypesEnabled; + + private final Sensor delayQueueSensor; + private final DelayQueue<ThrottledChannel> delayQueue = new DelayQueue<>(); + private final ThrottledChannelReaper throttledChannelReaper; + + public void processThrottledChannelReaperDoWork() { + throttledChannelReaper.doWork(); + } + + /** + * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics + * for all clients. + * <p/> + * Quotas can be set at <user, client-id>, user or client-id levels. For a given client connection, + * the most specific quota matching the connection will be applied. For example, if both a <user, client-id> + * and a user quota match a connection, the <user, client-id> quota will be used. Otherwise, user quota takes + * precedence over client-id quota. The order of precedence is: + * <ul> + * <li>/config/users/<user>/clients/<client-id> + * <li>/config/users/<user>/clients/<default> + * <li>/config/users/<user> + * <li>/config/users/<default>/clients/<client-id> + * <li>/config/users/<default>/clients/<default> + * <li>/config/users/<default> + * <li>/config/clients/<client-id> + * <li>/config/clients/<default> + * </ul> + * Quota limits including defaults may be updated dynamically. The implementation is optimized for the case + * where a single level of quotas is configured. + * @param config the ClientQuotaManagerConfig containing quota configurations + * @param metrics the Metrics instance for recording quota-related metrics + * @param quotaType the quota type managed by this quota manager + * @param time the Time object used for time-based operations + * @param threadNamePrefix the thread name prefix used for internal threads + * @param clientQuotaCallbackPlugin optional Plugin containing a ClientQuotaCallback for custom quota logic + */ + public ClientQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + QuotaType quotaType, + Time time, + String threadNamePrefix, + Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin) { + this.config = config; + this.metrics = metrics; + this.quotaType = quotaType; + this.time = time; + this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin; + + this.sensorAccessor = new SensorAccess(lock, metrics); + this.clientQuotaType = QuotaType.toClientQuotaType(quotaType); + + this.quotaTypesEnabled = clientQuotaCallbackPlugin.isPresent() ? + QuotaTypes.CUSTOM_QUOTAS : QuotaTypes.NO_QUOTAS; + + this.delayQueueSensor = metrics.sensor(quotaType.toString() + "-delayQueue"); Review Comment: `toString` is redunant ########## server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java: ########## @@ -0,0 +1,883 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Sanitizer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.server.quota.QuotaUtils; +import org.apache.kafka.server.quota.SensorAccess; +import org.apache.kafka.server.quota.ThrottleCallback; +import org.apache.kafka.server.quota.ThrottledChannel; +import org.apache.kafka.server.util.ShutdownableThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +public class ClientQuotaManager { + + static final class QuotaTypes { + static final int NO_QUOTAS = 0; + static final int CLIENT_ID_QUOTA_ENABLED = 1; + static final int USER_QUOTA_ENABLED = 2; + static final int USER_CLIENT_ID_QUOTA_ENABLED = 4; + static final int CUSTOM_QUOTAS = 8; // No metric update optimizations are used with custom quotas + } + + private static final Logger LOG = LoggerFactory.getLogger(ClientQuotaManager.class); + + // Purge sensors after 1 hour of inactivity + private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + private static final String DEFAULT_NAME = "<default>"; + + public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE); + public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null); + public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, DefaultClientIdEntity.INSTANCE); + + public record UserEntity(String sanitizedUser) implements ClientQuotaEntity.ConfigEntity { + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.USER; + } + + @Override + public String name() { + return Sanitizer.desanitize(sanitizedUser); + } + + @Override + public String toString() { + return "user " + sanitizedUser; + } + } + + // Convert to record - this is a simple data holder + public record ClientIdEntity(String clientId) implements ClientQuotaEntity.ConfigEntity { + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.CLIENT_ID; + } + + @Override + public String name() { + return clientId; + } + + @Override + public String toString() { + return "client-id " + clientId; + } + } + + // Keep as a class-uses a singleton pattern which doesn't work well with records + public static class DefaultUserEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultUserEntity INSTANCE = new DefaultUserEntity(); + + private DefaultUserEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default user"; + } + } + + public static class DefaultClientIdEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultClientIdEntity INSTANCE = new DefaultClientIdEntity(); + + private DefaultClientIdEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default client-id"; + } + } + + public record KafkaQuotaEntity(ClientQuotaEntity.ConfigEntity userEntity, + ClientQuotaEntity.ConfigEntity clientIdEntity) implements ClientQuotaEntity { + + @Override + public List<ConfigEntity> configEntities() { + List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>(); + if (userEntity != null) { + entities.add(userEntity); + } + if (clientIdEntity != null) { + entities.add(clientIdEntity); + } + return entities; + } + + public String sanitizedUser() { + if (userEntity instanceof UserEntity userRecord) { + return userRecord.sanitizedUser(); + } else if (userEntity == DefaultUserEntity.INSTANCE) { + return DEFAULT_NAME; + } + return ""; + } + + public String clientId() { + return clientIdEntity != null ? clientIdEntity.name() : ""; + } + + @Override + public String toString() { + String user = userEntity != null ? userEntity.toString() : ""; + String clientId = clientIdEntity != null ? clientIdEntity.toString() : ""; + return (user + " " + clientId).trim(); + } + } + + public static class DefaultTags { + public static final String USER = "user"; + public static final String CLIENT_ID = "client-id"; + } + + private final ClientQuotaManagerConfig config; + protected final Metrics metrics; + private final QuotaType quotaType; + protected final Time time; + private final Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin; + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final SensorAccess sensorAccessor; + private final ClientQuotaCallback quotaCallback; + private final ClientQuotaType clientQuotaType; + + private volatile int quotaTypesEnabled; + + private final Sensor delayQueueSensor; + private final DelayQueue<ThrottledChannel> delayQueue = new DelayQueue<>(); + private final ThrottledChannelReaper throttledChannelReaper; + + public void processThrottledChannelReaperDoWork() { + throttledChannelReaper.doWork(); + } + + /** + * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics + * for all clients. + * <p/> + * Quotas can be set at <user, client-id>, user or client-id levels. For a given client connection, + * the most specific quota matching the connection will be applied. For example, if both a <user, client-id> + * and a user quota match a connection, the <user, client-id> quota will be used. Otherwise, user quota takes + * precedence over client-id quota. The order of precedence is: + * <ul> + * <li>/config/users/<user>/clients/<client-id> + * <li>/config/users/<user>/clients/<default> + * <li>/config/users/<user> + * <li>/config/users/<default>/clients/<client-id> + * <li>/config/users/<default>/clients/<default> + * <li>/config/users/<default> + * <li>/config/clients/<client-id> + * <li>/config/clients/<default> + * </ul> + * Quota limits including defaults may be updated dynamically. The implementation is optimized for the case + * where a single level of quotas is configured. + * @param config the ClientQuotaManagerConfig containing quota configurations + * @param metrics the Metrics instance for recording quota-related metrics + * @param quotaType the quota type managed by this quota manager + * @param time the Time object used for time-based operations + * @param threadNamePrefix the thread name prefix used for internal threads + * @param clientQuotaCallbackPlugin optional Plugin containing a ClientQuotaCallback for custom quota logic + */ + public ClientQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + QuotaType quotaType, + Time time, + String threadNamePrefix, + Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin) { + this.config = config; + this.metrics = metrics; + this.quotaType = quotaType; + this.time = time; + this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin; + + this.sensorAccessor = new SensorAccess(lock, metrics); + this.clientQuotaType = QuotaType.toClientQuotaType(quotaType); + + this.quotaTypesEnabled = clientQuotaCallbackPlugin.isPresent() ? + QuotaTypes.CUSTOM_QUOTAS : QuotaTypes.NO_QUOTAS; + + this.delayQueueSensor = metrics.sensor(quotaType.toString() + "-delayQueue"); + this.delayQueueSensor.add(metrics.metricName("queue-size", quotaType.toString(), + "Tracks the size of the delay queue"), new CumulativeSum()); + this.throttledChannelReaper = new ThrottledChannelReaper(delayQueue, threadNamePrefix); + + this.quotaCallback = clientQuotaCallbackPlugin + .map(Plugin::get) + .orElse(new DefaultQuotaCallback()); + + start(); // Use the start method to keep spotbugs happy + } + + public ClientQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + QuotaType quotaType, + Time time, + String threadNamePrefix) { + this(config, metrics, quotaType, time, threadNamePrefix, Optional.empty()); + } + + protected Metrics metrics() { + return metrics; + } + protected Time time() { + return time; + } + + private void start() { + throttledChannelReaper.start(); + } + + /** + * Reaper thread that triggers channel unmute callbacks on all throttled channels + */ + public class ThrottledChannelReaper extends ShutdownableThread { + private final DelayQueue<ThrottledChannel> delayQueue; + + public ThrottledChannelReaper(DelayQueue<ThrottledChannel> delayQueue, String prefix) { + super(prefix + "ThrottledChannelReaper-" + quotaType, false); + this.delayQueue = delayQueue; + } + + @Override + public void doWork() { + ThrottledChannel throttledChannel; + try { + throttledChannel = delayQueue.poll(1, TimeUnit.SECONDS); + if (throttledChannel != null) { + // Decrement the size of the delay queue + delayQueueSensor.record(-1); + // Notify the socket server that throttling is done for this channel + throttledChannel.notifyThrottlingDone(); + } + } catch (InterruptedException e) { + // Ignore and continue + Thread.currentThread().interrupt(); + } + } + } + + /** + * Returns true if any quotas are enabled for this quota manager. This is used + * to determine if quota-related metrics should be created. + * Note: If any quotas (static defaults, dynamic defaults or quota overrides) have + * been configured for this broker at any time for this quota type, quotasEnabled will + * return true until the next broker restarts, even if all quotas are subsequently deleted. + */ + public boolean quotasEnabled() { + return quotaTypesEnabled != QuotaTypes.NO_QUOTAS; + } + + /** + * See recordAndGetThrottleTimeMs. + */ + public int maybeRecordAndGetThrottleTimeMs(Session session, String clientId, double value, long timeMs) { + // Record metrics only if quotas are enabled. + if (quotasEnabled()) { + return recordAndGetThrottleTimeMs(session, clientId, value, timeMs); + } else { + return 0; + } + } + + /** + * Records that a user/clientId accumulated or would like to accumulate the provided amount at the + * specified time, returns throttle time in milliseconds. + * + * @param session The session from which the user is extracted + * @param clientId The client id + * @param value The value to accumulate + * @param timeMs The time at which to accumulate the value + * @return The throttle time in milliseconds defines as the time to wait until the average + * rate gets back to the defined quota + */ + public int recordAndGetThrottleTimeMs(Session session, String clientId, double value, long timeMs) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + try { + clientSensors.quotaSensor().record(value, timeMs, true); + return 0; + } catch (QuotaViolationException e) { + int throttleTimeMs = (int) throttleTime(e, timeMs); + if (LOG.isDebugEnabled()) { + LOG.debug("Quota violated for sensor ({}). Delay time: ({})", + clientSensors.quotaSensor().name(), throttleTimeMs); + } + return throttleTimeMs; + } + } + + /** + * Records that a user/clientId changed some metric being throttled without checking for + * quota violation. The aggregate value will subsequently be used for throttling when the + * next request is processed. + */ + public void recordNoThrottle(Session session, String clientId, double value) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + clientSensors.quotaSensor().record(value, time.milliseconds(), false); + } + + /** + * "Unrecord" the given value that has already been recorded for the given user/client by recording a negative value + * of the same quantity. + * For a throttled fetch, the broker should return an empty response and thus should not record the value. Ideally, + * we would like to compute the throttle time before actually recording the value, but the current Sensor code + * couples value recording and quota checking very tightly. As a workaround, we will unrecord the value for the fetch + * in case of throttling. Rate keeps the sum of values that fall in each time window, so this should bring the + * overall sum back to the previous value. + */ + public void unrecordQuotaSensor(Session session, String clientId, double value, long timeMs) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + clientSensors.quotaSensor().record(value * -1, timeMs, false); + } + + /** + * Returns the maximum value that could be recorded without guaranteed throttling. + * Recording any larger value will always be throttled, even if no other values were recorded in the quota window. + * This is used for deciding the maximum bytes that can be fetched at once + */ + public double getMaxValueInQuotaWindow(Session session, String clientId) { Review Comment: could you please remove the prefix `get` ########## server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java: ########## @@ -0,0 +1,883 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Sanitizer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.server.quota.QuotaUtils; +import org.apache.kafka.server.quota.SensorAccess; +import org.apache.kafka.server.quota.ThrottleCallback; +import org.apache.kafka.server.quota.ThrottledChannel; +import org.apache.kafka.server.util.ShutdownableThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +public class ClientQuotaManager { + + static final class QuotaTypes { + static final int NO_QUOTAS = 0; + static final int CLIENT_ID_QUOTA_ENABLED = 1; + static final int USER_QUOTA_ENABLED = 2; + static final int USER_CLIENT_ID_QUOTA_ENABLED = 4; + static final int CUSTOM_QUOTAS = 8; // No metric update optimizations are used with custom quotas + } + + private static final Logger LOG = LoggerFactory.getLogger(ClientQuotaManager.class); + + // Purge sensors after 1 hour of inactivity + private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + private static final String DEFAULT_NAME = "<default>"; + + public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE); + public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null); + public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, DefaultClientIdEntity.INSTANCE); + + public record UserEntity(String sanitizedUser) implements ClientQuotaEntity.ConfigEntity { + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.USER; + } + + @Override + public String name() { + return Sanitizer.desanitize(sanitizedUser); + } + + @Override + public String toString() { + return "user " + sanitizedUser; + } + } + + // Convert to record - this is a simple data holder + public record ClientIdEntity(String clientId) implements ClientQuotaEntity.ConfigEntity { + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.CLIENT_ID; + } + + @Override + public String name() { + return clientId; + } + + @Override + public String toString() { + return "client-id " + clientId; + } + } + + // Keep as a class-uses a singleton pattern which doesn't work well with records + public static class DefaultUserEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultUserEntity INSTANCE = new DefaultUserEntity(); + + private DefaultUserEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default user"; + } + } + + public static class DefaultClientIdEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultClientIdEntity INSTANCE = new DefaultClientIdEntity(); + + private DefaultClientIdEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default client-id"; + } + } + + public record KafkaQuotaEntity(ClientQuotaEntity.ConfigEntity userEntity, + ClientQuotaEntity.ConfigEntity clientIdEntity) implements ClientQuotaEntity { + + @Override + public List<ConfigEntity> configEntities() { + List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>(); + if (userEntity != null) { + entities.add(userEntity); + } + if (clientIdEntity != null) { + entities.add(clientIdEntity); + } + return entities; + } + + public String sanitizedUser() { + if (userEntity instanceof UserEntity userRecord) { + return userRecord.sanitizedUser(); + } else if (userEntity == DefaultUserEntity.INSTANCE) { + return DEFAULT_NAME; + } + return ""; + } + + public String clientId() { + return clientIdEntity != null ? clientIdEntity.name() : ""; + } + + @Override + public String toString() { + String user = userEntity != null ? userEntity.toString() : ""; + String clientId = clientIdEntity != null ? clientIdEntity.toString() : ""; + return (user + " " + clientId).trim(); + } + } + + public static class DefaultTags { + public static final String USER = "user"; + public static final String CLIENT_ID = "client-id"; + } + + private final ClientQuotaManagerConfig config; + protected final Metrics metrics; + private final QuotaType quotaType; + protected final Time time; + private final Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin; + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final SensorAccess sensorAccessor; + private final ClientQuotaCallback quotaCallback; + private final ClientQuotaType clientQuotaType; + + private volatile int quotaTypesEnabled; + + private final Sensor delayQueueSensor; + private final DelayQueue<ThrottledChannel> delayQueue = new DelayQueue<>(); + private final ThrottledChannelReaper throttledChannelReaper; + + public void processThrottledChannelReaperDoWork() { + throttledChannelReaper.doWork(); + } + + /** + * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics + * for all clients. + * <p/> + * Quotas can be set at <user, client-id>, user or client-id levels. For a given client connection, + * the most specific quota matching the connection will be applied. For example, if both a <user, client-id> + * and a user quota match a connection, the <user, client-id> quota will be used. Otherwise, user quota takes + * precedence over client-id quota. The order of precedence is: + * <ul> + * <li>/config/users/<user>/clients/<client-id> + * <li>/config/users/<user>/clients/<default> + * <li>/config/users/<user> + * <li>/config/users/<default>/clients/<client-id> + * <li>/config/users/<default>/clients/<default> + * <li>/config/users/<default> + * <li>/config/clients/<client-id> + * <li>/config/clients/<default> + * </ul> + * Quota limits including defaults may be updated dynamically. The implementation is optimized for the case + * where a single level of quotas is configured. + * @param config the ClientQuotaManagerConfig containing quota configurations + * @param metrics the Metrics instance for recording quota-related metrics + * @param quotaType the quota type managed by this quota manager + * @param time the Time object used for time-based operations + * @param threadNamePrefix the thread name prefix used for internal threads + * @param clientQuotaCallbackPlugin optional Plugin containing a ClientQuotaCallback for custom quota logic + */ + public ClientQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + QuotaType quotaType, + Time time, + String threadNamePrefix, + Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin) { + this.config = config; + this.metrics = metrics; + this.quotaType = quotaType; + this.time = time; + this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin; + + this.sensorAccessor = new SensorAccess(lock, metrics); + this.clientQuotaType = QuotaType.toClientQuotaType(quotaType); + + this.quotaTypesEnabled = clientQuotaCallbackPlugin.isPresent() ? + QuotaTypes.CUSTOM_QUOTAS : QuotaTypes.NO_QUOTAS; + + this.delayQueueSensor = metrics.sensor(quotaType.toString() + "-delayQueue"); + this.delayQueueSensor.add(metrics.metricName("queue-size", quotaType.toString(), + "Tracks the size of the delay queue"), new CumulativeSum()); + this.throttledChannelReaper = new ThrottledChannelReaper(delayQueue, threadNamePrefix); + + this.quotaCallback = clientQuotaCallbackPlugin + .map(Plugin::get) + .orElse(new DefaultQuotaCallback()); + + start(); // Use the start method to keep spotbugs happy + } + + public ClientQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + QuotaType quotaType, + Time time, + String threadNamePrefix) { + this(config, metrics, quotaType, time, threadNamePrefix, Optional.empty()); + } + + protected Metrics metrics() { + return metrics; + } + protected Time time() { + return time; + } + + private void start() { + throttledChannelReaper.start(); + } + + /** + * Reaper thread that triggers channel unmute callbacks on all throttled channels + */ + public class ThrottledChannelReaper extends ShutdownableThread { + private final DelayQueue<ThrottledChannel> delayQueue; + + public ThrottledChannelReaper(DelayQueue<ThrottledChannel> delayQueue, String prefix) { + super(prefix + "ThrottledChannelReaper-" + quotaType, false); + this.delayQueue = delayQueue; + } + + @Override + public void doWork() { + ThrottledChannel throttledChannel; + try { + throttledChannel = delayQueue.poll(1, TimeUnit.SECONDS); + if (throttledChannel != null) { + // Decrement the size of the delay queue + delayQueueSensor.record(-1); + // Notify the socket server that throttling is done for this channel + throttledChannel.notifyThrottlingDone(); + } + } catch (InterruptedException e) { + // Ignore and continue + Thread.currentThread().interrupt(); + } + } + } + + /** + * Returns true if any quotas are enabled for this quota manager. This is used + * to determine if quota-related metrics should be created. + * Note: If any quotas (static defaults, dynamic defaults or quota overrides) have + * been configured for this broker at any time for this quota type, quotasEnabled will + * return true until the next broker restarts, even if all quotas are subsequently deleted. + */ + public boolean quotasEnabled() { + return quotaTypesEnabled != QuotaTypes.NO_QUOTAS; + } + + /** + * See recordAndGetThrottleTimeMs. + */ + public int maybeRecordAndGetThrottleTimeMs(Session session, String clientId, double value, long timeMs) { + // Record metrics only if quotas are enabled. + if (quotasEnabled()) { + return recordAndGetThrottleTimeMs(session, clientId, value, timeMs); + } else { + return 0; + } + } + + /** + * Records that a user/clientId accumulated or would like to accumulate the provided amount at the + * specified time, returns throttle time in milliseconds. + * + * @param session The session from which the user is extracted + * @param clientId The client id + * @param value The value to accumulate + * @param timeMs The time at which to accumulate the value + * @return The throttle time in milliseconds defines as the time to wait until the average + * rate gets back to the defined quota + */ + public int recordAndGetThrottleTimeMs(Session session, String clientId, double value, long timeMs) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + try { + clientSensors.quotaSensor().record(value, timeMs, true); + return 0; + } catch (QuotaViolationException e) { + int throttleTimeMs = (int) throttleTime(e, timeMs); + if (LOG.isDebugEnabled()) { + LOG.debug("Quota violated for sensor ({}). Delay time: ({})", + clientSensors.quotaSensor().name(), throttleTimeMs); + } + return throttleTimeMs; + } + } + + /** + * Records that a user/clientId changed some metric being throttled without checking for + * quota violation. The aggregate value will subsequently be used for throttling when the + * next request is processed. + */ + public void recordNoThrottle(Session session, String clientId, double value) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + clientSensors.quotaSensor().record(value, time.milliseconds(), false); + } + + /** + * "Unrecord" the given value that has already been recorded for the given user/client by recording a negative value + * of the same quantity. + * For a throttled fetch, the broker should return an empty response and thus should not record the value. Ideally, + * we would like to compute the throttle time before actually recording the value, but the current Sensor code + * couples value recording and quota checking very tightly. As a workaround, we will unrecord the value for the fetch + * in case of throttling. Rate keeps the sum of values that fall in each time window, so this should bring the + * overall sum back to the previous value. + */ + public void unrecordQuotaSensor(Session session, String clientId, double value, long timeMs) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + clientSensors.quotaSensor().record(value * -1, timeMs, false); + } + + /** + * Returns the maximum value that could be recorded without guaranteed throttling. + * Recording any larger value will always be throttled, even if no other values were recorded in the quota window. + * This is used for deciding the maximum bytes that can be fetched at once + */ + public double getMaxValueInQuotaWindow(Session session, String clientId) { + if (quotasEnabled()) { Review Comment: ```java if (!quotasEnabled()) return Double.MAX_VALUE; var clientSensors = getOrCreateQuotaSensors(session, clientId); var limit = quotaCallback.quotaLimit(clientQuotaType, clientSensors.metricTags()); if (limit != null) return limit * (config.numQuotaSamples - 1) * config.quotaWindowSizeSeconds; return Double.MAX_VALUE; ``` ########## server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java: ########## @@ -0,0 +1,883 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Sanitizer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.server.quota.QuotaUtils; +import org.apache.kafka.server.quota.SensorAccess; +import org.apache.kafka.server.quota.ThrottleCallback; +import org.apache.kafka.server.quota.ThrottledChannel; +import org.apache.kafka.server.util.ShutdownableThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +public class ClientQuotaManager { + + static final class QuotaTypes { + static final int NO_QUOTAS = 0; + static final int CLIENT_ID_QUOTA_ENABLED = 1; + static final int USER_QUOTA_ENABLED = 2; + static final int USER_CLIENT_ID_QUOTA_ENABLED = 4; + static final int CUSTOM_QUOTAS = 8; // No metric update optimizations are used with custom quotas + } + + private static final Logger LOG = LoggerFactory.getLogger(ClientQuotaManager.class); + + // Purge sensors after 1 hour of inactivity + private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + private static final String DEFAULT_NAME = "<default>"; + + public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE); + public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null); + public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, DefaultClientIdEntity.INSTANCE); + + public record UserEntity(String sanitizedUser) implements ClientQuotaEntity.ConfigEntity { + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.USER; + } + + @Override + public String name() { + return Sanitizer.desanitize(sanitizedUser); + } + + @Override + public String toString() { + return "user " + sanitizedUser; + } + } + + // Convert to record - this is a simple data holder + public record ClientIdEntity(String clientId) implements ClientQuotaEntity.ConfigEntity { + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.CLIENT_ID; + } + + @Override + public String name() { + return clientId; + } + + @Override + public String toString() { + return "client-id " + clientId; + } + } + + // Keep as a class-uses a singleton pattern which doesn't work well with records + public static class DefaultUserEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultUserEntity INSTANCE = new DefaultUserEntity(); + + private DefaultUserEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default user"; + } + } + + public static class DefaultClientIdEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultClientIdEntity INSTANCE = new DefaultClientIdEntity(); + + private DefaultClientIdEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default client-id"; + } + } + + public record KafkaQuotaEntity(ClientQuotaEntity.ConfigEntity userEntity, + ClientQuotaEntity.ConfigEntity clientIdEntity) implements ClientQuotaEntity { + + @Override + public List<ConfigEntity> configEntities() { + List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>(); + if (userEntity != null) { + entities.add(userEntity); + } + if (clientIdEntity != null) { + entities.add(clientIdEntity); + } + return entities; + } + + public String sanitizedUser() { + if (userEntity instanceof UserEntity userRecord) { + return userRecord.sanitizedUser(); + } else if (userEntity == DefaultUserEntity.INSTANCE) { + return DEFAULT_NAME; + } + return ""; + } + + public String clientId() { + return clientIdEntity != null ? clientIdEntity.name() : ""; + } + + @Override + public String toString() { + String user = userEntity != null ? userEntity.toString() : ""; + String clientId = clientIdEntity != null ? clientIdEntity.toString() : ""; + return (user + " " + clientId).trim(); + } + } + + public static class DefaultTags { Review Comment: ditto ########## server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java: ########## @@ -0,0 +1,883 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Sanitizer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.server.quota.QuotaUtils; +import org.apache.kafka.server.quota.SensorAccess; +import org.apache.kafka.server.quota.ThrottleCallback; +import org.apache.kafka.server.quota.ThrottledChannel; +import org.apache.kafka.server.util.ShutdownableThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +public class ClientQuotaManager { + + static final class QuotaTypes { + static final int NO_QUOTAS = 0; + static final int CLIENT_ID_QUOTA_ENABLED = 1; + static final int USER_QUOTA_ENABLED = 2; + static final int USER_CLIENT_ID_QUOTA_ENABLED = 4; + static final int CUSTOM_QUOTAS = 8; // No metric update optimizations are used with custom quotas + } + + private static final Logger LOG = LoggerFactory.getLogger(ClientQuotaManager.class); + + // Purge sensors after 1 hour of inactivity + private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + private static final String DEFAULT_NAME = "<default>"; + + public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE); + public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null); + public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, DefaultClientIdEntity.INSTANCE); + + public record UserEntity(String sanitizedUser) implements ClientQuotaEntity.ConfigEntity { + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.USER; + } + + @Override + public String name() { + return Sanitizer.desanitize(sanitizedUser); + } + + @Override + public String toString() { + return "user " + sanitizedUser; + } + } + + // Convert to record - this is a simple data holder + public record ClientIdEntity(String clientId) implements ClientQuotaEntity.ConfigEntity { + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.CLIENT_ID; + } + + @Override + public String name() { + return clientId; + } + + @Override + public String toString() { + return "client-id " + clientId; + } + } + + // Keep as a class-uses a singleton pattern which doesn't work well with records + public static class DefaultUserEntity implements ClientQuotaEntity.ConfigEntity { Review Comment: ```java public static final ClientQuotaEntity.ConfigEntity DEFAULT_USER_ENTITY = new ClientQuotaEntity.ConfigEntity() { @Override public ClientQuotaEntity.ConfigEntityType entityType() { return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER; } @Override public String name() { return DEFAULT_NAME; } @Override public String toString() { return "default user"; } }; ``` ########## server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java: ########## @@ -0,0 +1,942 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Sanitizer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.server.quota.QuotaUtils; +import org.apache.kafka.server.quota.SensorAccess; +import org.apache.kafka.server.quota.ThrottleCallback; +import org.apache.kafka.server.quota.ThrottledChannel; +import org.apache.kafka.server.util.ShutdownableThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +final class QuotaTypes { + static final int NO_QUOTAS = 0; + static final int CLIENT_ID_QUOTA_ENABLED = 1; + static final int USER_QUOTA_ENABLED = 2; + static final int USER_CLIENT_ID_QUOTA_ENABLED = 4; + static final int CUSTOM_QUOTAS = 8; // No metric update optimizations are used with custom quotas +} + +public class ClientQuotaManager { + + private static final Logger log = LoggerFactory.getLogger(ClientQuotaManager.class); + + // Purge sensors after 1 hour of inactivity + public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + private static final String DEFAULT_NAME = "<default>"; + + public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE); + public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null); + public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, DefaultClientIdEntity.INSTANCE); + + public interface BaseUserEntity extends ClientQuotaEntity.ConfigEntity { } + + public static class UserEntity implements BaseUserEntity { + private final String sanitizedUser; + + public UserEntity(String sanitizedUser) { + this.sanitizedUser = sanitizedUser; + } + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.USER; + } + + @Override + public String name() { + return Sanitizer.desanitize(sanitizedUser); + } + + public String getSanitizedUser() { + return sanitizedUser; + } + + @Override + public String toString() { + return "user " + sanitizedUser; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + UserEntity that = (UserEntity) obj; + return sanitizedUser.equals(that.sanitizedUser); + } + + @Override + public int hashCode() { + return sanitizedUser.hashCode(); + } + } + + public static class ClientIdEntity implements ClientQuotaEntity.ConfigEntity { + private final String clientId; + + public ClientIdEntity(String clientId) { + this.clientId = clientId; + } + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.CLIENT_ID; + } + + @Override + public String name() { + return clientId; + } + + public String getClientId() { + return clientId; + } + + @Override + public String toString() { + return "client-id " + clientId; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + ClientIdEntity that = (ClientIdEntity) obj; + return clientId.equals(that.clientId); + } + + @Override + public int hashCode() { + return clientId.hashCode(); + } + } + + public static class DefaultUserEntity implements BaseUserEntity { + public static final DefaultUserEntity INSTANCE = new DefaultUserEntity(); + + private DefaultUserEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default user"; + } + } + + public static class DefaultClientIdEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultClientIdEntity INSTANCE = new DefaultClientIdEntity(); + + private DefaultClientIdEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default client-id"; + } + } + + public static class KafkaQuotaEntity implements ClientQuotaEntity { + private final BaseUserEntity userEntity; + private final ClientQuotaEntity.ConfigEntity clientIdEntity; + + public KafkaQuotaEntity(BaseUserEntity userEntity, ClientQuotaEntity.ConfigEntity clientIdEntity) { + this.userEntity = userEntity; + this.clientIdEntity = clientIdEntity; + } + + @Override + public List<ConfigEntity> configEntities() { + List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>(); + if (userEntity != null) { + entities.add(userEntity); + } + if (clientIdEntity != null) { + entities.add(clientIdEntity); + } + return entities; + } + + public String sanitizedUser() { + if (userEntity instanceof UserEntity) { + return ((UserEntity) userEntity).getSanitizedUser(); + } else if (userEntity == DefaultUserEntity.INSTANCE) { + return DEFAULT_NAME; + } + return ""; + } + + public String clientId() { + return clientIdEntity != null ? clientIdEntity.name() : ""; + } + + @Override + public String toString() { + String user = userEntity != null ? userEntity.toString() : ""; + String clientId = clientIdEntity != null ? clientIdEntity.toString() : ""; + return (user + " " + clientId).trim(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + KafkaQuotaEntity that = (KafkaQuotaEntity) obj; + return java.util.Objects.equals(userEntity, that.userEntity) && + java.util.Objects.equals(clientIdEntity, that.clientIdEntity); + } + + @Override + public int hashCode() { + return java.util.Objects.hash(userEntity, clientIdEntity); + } + } + + public static class DefaultTags { + public static final String USER = "user"; + public static final String CLIENT_ID = "client-id"; + } + + private final ClientQuotaManagerConfig config; + protected final Metrics metrics; + private final QuotaType quotaType; + protected final Time time; + private final Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin; Review Comment: I assume you will remove this variable, but you don't. Could you please share the reason with me? ########## server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java: ########## @@ -0,0 +1,883 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Sanitizer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.server.quota.QuotaUtils; +import org.apache.kafka.server.quota.SensorAccess; +import org.apache.kafka.server.quota.ThrottleCallback; +import org.apache.kafka.server.quota.ThrottledChannel; +import org.apache.kafka.server.util.ShutdownableThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +public class ClientQuotaManager { + + static final class QuotaTypes { + static final int NO_QUOTAS = 0; + static final int CLIENT_ID_QUOTA_ENABLED = 1; + static final int USER_QUOTA_ENABLED = 2; + static final int USER_CLIENT_ID_QUOTA_ENABLED = 4; + static final int CUSTOM_QUOTAS = 8; // No metric update optimizations are used with custom quotas + } + + private static final Logger LOG = LoggerFactory.getLogger(ClientQuotaManager.class); + + // Purge sensors after 1 hour of inactivity + private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + private static final String DEFAULT_NAME = "<default>"; + + public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE); + public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null); + public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, DefaultClientIdEntity.INSTANCE); + + public record UserEntity(String sanitizedUser) implements ClientQuotaEntity.ConfigEntity { + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.USER; + } + + @Override + public String name() { + return Sanitizer.desanitize(sanitizedUser); + } + + @Override + public String toString() { + return "user " + sanitizedUser; + } + } + + // Convert to record - this is a simple data holder + public record ClientIdEntity(String clientId) implements ClientQuotaEntity.ConfigEntity { + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.CLIENT_ID; + } + + @Override + public String name() { + return clientId; + } + + @Override + public String toString() { + return "client-id " + clientId; + } + } + + // Keep as a class-uses a singleton pattern which doesn't work well with records + public static class DefaultUserEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultUserEntity INSTANCE = new DefaultUserEntity(); + + private DefaultUserEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default user"; + } + } + + public static class DefaultClientIdEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultClientIdEntity INSTANCE = new DefaultClientIdEntity(); + + private DefaultClientIdEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default client-id"; + } + } + + public record KafkaQuotaEntity(ClientQuotaEntity.ConfigEntity userEntity, + ClientQuotaEntity.ConfigEntity clientIdEntity) implements ClientQuotaEntity { + + @Override + public List<ConfigEntity> configEntities() { + List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>(); + if (userEntity != null) { + entities.add(userEntity); + } + if (clientIdEntity != null) { + entities.add(clientIdEntity); + } + return entities; + } + + public String sanitizedUser() { + if (userEntity instanceof UserEntity userRecord) { + return userRecord.sanitizedUser(); + } else if (userEntity == DefaultUserEntity.INSTANCE) { + return DEFAULT_NAME; + } + return ""; + } + + public String clientId() { + return clientIdEntity != null ? clientIdEntity.name() : ""; + } + + @Override + public String toString() { + String user = userEntity != null ? userEntity.toString() : ""; + String clientId = clientIdEntity != null ? clientIdEntity.toString() : ""; + return (user + " " + clientId).trim(); + } + } + + public static class DefaultTags { + public static final String USER = "user"; + public static final String CLIENT_ID = "client-id"; + } + + private final ClientQuotaManagerConfig config; + protected final Metrics metrics; + private final QuotaType quotaType; + protected final Time time; + private final Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin; + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final SensorAccess sensorAccessor; + private final ClientQuotaCallback quotaCallback; + private final ClientQuotaType clientQuotaType; + + private volatile int quotaTypesEnabled; + + private final Sensor delayQueueSensor; + private final DelayQueue<ThrottledChannel> delayQueue = new DelayQueue<>(); + private final ThrottledChannelReaper throttledChannelReaper; + + public void processThrottledChannelReaperDoWork() { + throttledChannelReaper.doWork(); + } + + /** + * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics + * for all clients. + * <p/> + * Quotas can be set at <user, client-id>, user or client-id levels. For a given client connection, + * the most specific quota matching the connection will be applied. For example, if both a <user, client-id> + * and a user quota match a connection, the <user, client-id> quota will be used. Otherwise, user quota takes + * precedence over client-id quota. The order of precedence is: + * <ul> + * <li>/config/users/<user>/clients/<client-id> + * <li>/config/users/<user>/clients/<default> + * <li>/config/users/<user> + * <li>/config/users/<default>/clients/<client-id> + * <li>/config/users/<default>/clients/<default> + * <li>/config/users/<default> + * <li>/config/clients/<client-id> + * <li>/config/clients/<default> + * </ul> + * Quota limits including defaults may be updated dynamically. The implementation is optimized for the case + * where a single level of quotas is configured. + * @param config the ClientQuotaManagerConfig containing quota configurations + * @param metrics the Metrics instance for recording quota-related metrics + * @param quotaType the quota type managed by this quota manager + * @param time the Time object used for time-based operations + * @param threadNamePrefix the thread name prefix used for internal threads + * @param clientQuotaCallbackPlugin optional Plugin containing a ClientQuotaCallback for custom quota logic + */ + public ClientQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + QuotaType quotaType, + Time time, + String threadNamePrefix, + Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin) { + this.config = config; + this.metrics = metrics; + this.quotaType = quotaType; + this.time = time; + this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin; + + this.sensorAccessor = new SensorAccess(lock, metrics); + this.clientQuotaType = QuotaType.toClientQuotaType(quotaType); + + this.quotaTypesEnabled = clientQuotaCallbackPlugin.isPresent() ? + QuotaTypes.CUSTOM_QUOTAS : QuotaTypes.NO_QUOTAS; + + this.delayQueueSensor = metrics.sensor(quotaType.toString() + "-delayQueue"); + this.delayQueueSensor.add(metrics.metricName("queue-size", quotaType.toString(), + "Tracks the size of the delay queue"), new CumulativeSum()); + this.throttledChannelReaper = new ThrottledChannelReaper(delayQueue, threadNamePrefix); + + this.quotaCallback = clientQuotaCallbackPlugin + .map(Plugin::get) + .orElse(new DefaultQuotaCallback()); + + start(); // Use the start method to keep spotbugs happy + } + + public ClientQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + QuotaType quotaType, + Time time, + String threadNamePrefix) { + this(config, metrics, quotaType, time, threadNamePrefix, Optional.empty()); + } + + protected Metrics metrics() { + return metrics; + } + protected Time time() { + return time; + } + + private void start() { + throttledChannelReaper.start(); Review Comment: this method can be inlined to constructor ########## server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java: ########## @@ -0,0 +1,883 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Sanitizer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.server.quota.QuotaUtils; +import org.apache.kafka.server.quota.SensorAccess; +import org.apache.kafka.server.quota.ThrottleCallback; +import org.apache.kafka.server.quota.ThrottledChannel; +import org.apache.kafka.server.util.ShutdownableThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +public class ClientQuotaManager { + + static final class QuotaTypes { + static final int NO_QUOTAS = 0; + static final int CLIENT_ID_QUOTA_ENABLED = 1; + static final int USER_QUOTA_ENABLED = 2; + static final int USER_CLIENT_ID_QUOTA_ENABLED = 4; + static final int CUSTOM_QUOTAS = 8; // No metric update optimizations are used with custom quotas + } + + private static final Logger LOG = LoggerFactory.getLogger(ClientQuotaManager.class); + + // Purge sensors after 1 hour of inactivity + private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + private static final String DEFAULT_NAME = "<default>"; + + public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE); + public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null); + public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY = + new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, DefaultClientIdEntity.INSTANCE); + + public record UserEntity(String sanitizedUser) implements ClientQuotaEntity.ConfigEntity { + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.USER; + } + + @Override + public String name() { + return Sanitizer.desanitize(sanitizedUser); + } + + @Override + public String toString() { + return "user " + sanitizedUser; + } + } + + // Convert to record - this is a simple data holder + public record ClientIdEntity(String clientId) implements ClientQuotaEntity.ConfigEntity { + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.CLIENT_ID; + } + + @Override + public String name() { + return clientId; + } + + @Override + public String toString() { + return "client-id " + clientId; + } + } + + // Keep as a class-uses a singleton pattern which doesn't work well with records + public static class DefaultUserEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultUserEntity INSTANCE = new DefaultUserEntity(); + + private DefaultUserEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default user"; + } + } + + public static class DefaultClientIdEntity implements ClientQuotaEntity.ConfigEntity { + public static final DefaultClientIdEntity INSTANCE = new DefaultClientIdEntity(); + + private DefaultClientIdEntity() {} + + @Override + public ClientQuotaEntity.ConfigEntityType entityType() { + return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID; + } + + @Override + public String name() { + return DEFAULT_NAME; + } + + @Override + public String toString() { + return "default client-id"; + } + } + + public record KafkaQuotaEntity(ClientQuotaEntity.ConfigEntity userEntity, + ClientQuotaEntity.ConfigEntity clientIdEntity) implements ClientQuotaEntity { + + @Override + public List<ConfigEntity> configEntities() { + List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>(); + if (userEntity != null) { + entities.add(userEntity); + } + if (clientIdEntity != null) { + entities.add(clientIdEntity); + } + return entities; + } + + public String sanitizedUser() { + if (userEntity instanceof UserEntity userRecord) { + return userRecord.sanitizedUser(); + } else if (userEntity == DefaultUserEntity.INSTANCE) { + return DEFAULT_NAME; + } + return ""; + } + + public String clientId() { + return clientIdEntity != null ? clientIdEntity.name() : ""; + } + + @Override + public String toString() { + String user = userEntity != null ? userEntity.toString() : ""; + String clientId = clientIdEntity != null ? clientIdEntity.toString() : ""; + return (user + " " + clientId).trim(); + } + } + + public static class DefaultTags { + public static final String USER = "user"; + public static final String CLIENT_ID = "client-id"; + } + + private final ClientQuotaManagerConfig config; + protected final Metrics metrics; + private final QuotaType quotaType; + protected final Time time; + private final Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin; + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final SensorAccess sensorAccessor; + private final ClientQuotaCallback quotaCallback; + private final ClientQuotaType clientQuotaType; + + private volatile int quotaTypesEnabled; + + private final Sensor delayQueueSensor; + private final DelayQueue<ThrottledChannel> delayQueue = new DelayQueue<>(); + private final ThrottledChannelReaper throttledChannelReaper; + + public void processThrottledChannelReaperDoWork() { + throttledChannelReaper.doWork(); + } + + /** + * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics + * for all clients. + * <p/> + * Quotas can be set at <user, client-id>, user or client-id levels. For a given client connection, + * the most specific quota matching the connection will be applied. For example, if both a <user, client-id> + * and a user quota match a connection, the <user, client-id> quota will be used. Otherwise, user quota takes + * precedence over client-id quota. The order of precedence is: + * <ul> + * <li>/config/users/<user>/clients/<client-id> + * <li>/config/users/<user>/clients/<default> + * <li>/config/users/<user> + * <li>/config/users/<default>/clients/<client-id> + * <li>/config/users/<default>/clients/<default> + * <li>/config/users/<default> + * <li>/config/clients/<client-id> + * <li>/config/clients/<default> + * </ul> + * Quota limits including defaults may be updated dynamically. The implementation is optimized for the case + * where a single level of quotas is configured. + * @param config the ClientQuotaManagerConfig containing quota configurations + * @param metrics the Metrics instance for recording quota-related metrics + * @param quotaType the quota type managed by this quota manager + * @param time the Time object used for time-based operations + * @param threadNamePrefix the thread name prefix used for internal threads + * @param clientQuotaCallbackPlugin optional Plugin containing a ClientQuotaCallback for custom quota logic + */ + public ClientQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + QuotaType quotaType, + Time time, + String threadNamePrefix, + Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin) { + this.config = config; + this.metrics = metrics; + this.quotaType = quotaType; + this.time = time; + this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin; + + this.sensorAccessor = new SensorAccess(lock, metrics); + this.clientQuotaType = QuotaType.toClientQuotaType(quotaType); + + this.quotaTypesEnabled = clientQuotaCallbackPlugin.isPresent() ? + QuotaTypes.CUSTOM_QUOTAS : QuotaTypes.NO_QUOTAS; + + this.delayQueueSensor = metrics.sensor(quotaType.toString() + "-delayQueue"); + this.delayQueueSensor.add(metrics.metricName("queue-size", quotaType.toString(), + "Tracks the size of the delay queue"), new CumulativeSum()); + this.throttledChannelReaper = new ThrottledChannelReaper(delayQueue, threadNamePrefix); + + this.quotaCallback = clientQuotaCallbackPlugin + .map(Plugin::get) + .orElse(new DefaultQuotaCallback()); + + start(); // Use the start method to keep spotbugs happy + } + + public ClientQuotaManager(ClientQuotaManagerConfig config, + Metrics metrics, + QuotaType quotaType, + Time time, + String threadNamePrefix) { + this(config, metrics, quotaType, time, threadNamePrefix, Optional.empty()); + } + + protected Metrics metrics() { + return metrics; + } + protected Time time() { + return time; + } + + private void start() { + throttledChannelReaper.start(); + } + + /** + * Reaper thread that triggers channel unmute callbacks on all throttled channels + */ + public class ThrottledChannelReaper extends ShutdownableThread { + private final DelayQueue<ThrottledChannel> delayQueue; + + public ThrottledChannelReaper(DelayQueue<ThrottledChannel> delayQueue, String prefix) { + super(prefix + "ThrottledChannelReaper-" + quotaType, false); + this.delayQueue = delayQueue; + } + + @Override + public void doWork() { + ThrottledChannel throttledChannel; + try { + throttledChannel = delayQueue.poll(1, TimeUnit.SECONDS); + if (throttledChannel != null) { + // Decrement the size of the delay queue + delayQueueSensor.record(-1); + // Notify the socket server that throttling is done for this channel + throttledChannel.notifyThrottlingDone(); + } + } catch (InterruptedException e) { + // Ignore and continue + Thread.currentThread().interrupt(); + } + } + } + + /** + * Returns true if any quotas are enabled for this quota manager. This is used + * to determine if quota-related metrics should be created. + * Note: If any quotas (static defaults, dynamic defaults or quota overrides) have + * been configured for this broker at any time for this quota type, quotasEnabled will + * return true until the next broker restarts, even if all quotas are subsequently deleted. + */ + public boolean quotasEnabled() { + return quotaTypesEnabled != QuotaTypes.NO_QUOTAS; + } + + /** + * See recordAndGetThrottleTimeMs. + */ + public int maybeRecordAndGetThrottleTimeMs(Session session, String clientId, double value, long timeMs) { + // Record metrics only if quotas are enabled. + if (quotasEnabled()) { + return recordAndGetThrottleTimeMs(session, clientId, value, timeMs); + } else { + return 0; + } + } + + /** + * Records that a user/clientId accumulated or would like to accumulate the provided amount at the + * specified time, returns throttle time in milliseconds. + * + * @param session The session from which the user is extracted + * @param clientId The client id + * @param value The value to accumulate + * @param timeMs The time at which to accumulate the value + * @return The throttle time in milliseconds defines as the time to wait until the average + * rate gets back to the defined quota + */ + public int recordAndGetThrottleTimeMs(Session session, String clientId, double value, long timeMs) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + try { + clientSensors.quotaSensor().record(value, timeMs, true); + return 0; + } catch (QuotaViolationException e) { + int throttleTimeMs = (int) throttleTime(e, timeMs); + if (LOG.isDebugEnabled()) { + LOG.debug("Quota violated for sensor ({}). Delay time: ({})", + clientSensors.quotaSensor().name(), throttleTimeMs); + } + return throttleTimeMs; + } + } + + /** + * Records that a user/clientId changed some metric being throttled without checking for + * quota violation. The aggregate value will subsequently be used for throttling when the + * next request is processed. + */ + public void recordNoThrottle(Session session, String clientId, double value) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + clientSensors.quotaSensor().record(value, time.milliseconds(), false); + } + + /** + * "Unrecord" the given value that has already been recorded for the given user/client by recording a negative value + * of the same quantity. + * For a throttled fetch, the broker should return an empty response and thus should not record the value. Ideally, + * we would like to compute the throttle time before actually recording the value, but the current Sensor code + * couples value recording and quota checking very tightly. As a workaround, we will unrecord the value for the fetch + * in case of throttling. Rate keeps the sum of values that fall in each time window, so this should bring the + * overall sum back to the previous value. + */ + public void unrecordQuotaSensor(Session session, String clientId, double value, long timeMs) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + clientSensors.quotaSensor().record(value * -1, timeMs, false); + } + + /** + * Returns the maximum value that could be recorded without guaranteed throttling. + * Recording any larger value will always be throttled, even if no other values were recorded in the quota window. + * This is used for deciding the maximum bytes that can be fetched at once + */ + public double getMaxValueInQuotaWindow(Session session, String clientId) { + if (quotasEnabled()) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); + Double limit = quotaCallback.quotaLimit(clientQuotaType, clientSensors.metricTags()); + if (limit != null) { + return limit * (config.numQuotaSamples - 1) * config.quotaWindowSizeSeconds; + } else { + return Double.MAX_VALUE; + } + } else { + return Double.MAX_VALUE; + } + } + + /** + * Throttle a client by muting the associated channel for the given throttle time. + * @param clientId request client id + * @param session request session + * @param throttleTimeMs Duration in milliseconds for which the channel is to be muted. + * @param throttleCallback Callback for channel throttling + */ + public void throttle( + String clientId, + Session session, + ThrottleCallback throttleCallback, + int throttleTimeMs + ) { + if (throttleTimeMs > 0) { + ClientSensors clientSensors = getOrCreateQuotaSensors(session, clientId); Review Comment: it would be cool if we can leverage `var` to cleanup the type declaration. ```java var clientSensors = getOrCreateQuotaSensors(session, clientId); clientSensors.throttleTimeSensor().record(throttleTimeMs); var throttledChannel = new ThrottledChannel(time, throttleTimeMs, throttleCallback); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org