chia7712 commented on code in PR #19807:
URL: https://github.com/apache/kafka/pull/19807#discussion_r2129670800


##########
server/src/main/java/org/apache/kafka/server/PermissiveControllerMutationQuota.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.Objects;
+
+/**
+ * The PermissiveControllerMutationQuota defines a permissive quota for a 
given user/clientId pair.
+ * The quota is permissive meaning that 1) it does accept any mutations even 
if the quota is
+ * exhausted; and 2) it does throttle as soon as the quota is exhausted.
+ */
+public class PermissiveControllerMutationQuota extends 
AbstractControllerMutationQuota {
+    private final Sensor quotaSensor;
+
+    /**
+     * Creates a new PermissiveControllerMutationQuota with the specified time 
source and quota sensor.
+     *
+     * @param time the Time object used for time-based calculations and quota 
tracking
+     * @param quotaSensor the Sensor object that tracks quota usage for a 
specific user/clientId pair
+     * @throws IllegalArgumentException if time or quotaSensor is null
+     */
+    public PermissiveControllerMutationQuota(Time time, Sensor quotaSensor) {
+        super(Objects.requireNonNull(time, "time cannot be null"));

Review Comment:
   could you please move the null check to `AbstractControllerMutationQuota`?



##########
server/src/main/java/org/apache/kafka/server/ControllerMutationQuotaManager.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.TokenBucket;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.QuotaType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+
+
+/**
+ * The ControllerMutationQuotaManager is a specialized ClientQuotaManager used 
in the context
+ * of throttling controller's operations/mutations.
+ */
+public class ControllerMutationQuotaManager extends ClientQuotaManager {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ControllerMutationQuotaManager.class);
+
+    /**
+     * @param config ClientQuotaManagerConfig quota configs
+     * @param metrics Metrics instance
+     * @param time Time object to use
+     * @param threadNamePrefix The thread prefix to use
+     * @param quotaCallback ClientQuotaCallback to use
+     */
+    public ControllerMutationQuotaManager(ClientQuotaManagerConfig config,
+                                          Metrics metrics,
+                                          Time time,
+                                          String threadNamePrefix,
+                                          
Optional<Plugin<ClientQuotaCallback>> quotaCallback) {
+        super(config, metrics, QuotaType.CONTROLLER_MUTATION, time, 
threadNamePrefix, quotaCallback);
+    }
+
+    @Override
+    protected MetricName clientQuotaMetricName(Map<String, String> 
quotaMetricTags) {
+        return metrics.metricName("tokens", 
QuotaType.CONTROLLER_MUTATION.toString(),
+                "Tracking remaining tokens in the token bucket per 
user/client-id",
+                quotaMetricTags);
+    }
+
+    private MetricName clientRateMetricName(Map<String, String> 
quotaMetricTags) {
+        return metrics.metricName("mutation-rate", 
QuotaType.CONTROLLER_MUTATION.toString(),
+                "Tracking mutation-rate per user/client-id",
+                quotaMetricTags);
+    }
+
+    @Override
+    protected void registerQuotaMetrics(Map<String, String> metricTags, Sensor 
sensor) {
+        sensor.add(
+                clientRateMetricName(metricTags),
+                new Rate()
+        );
+        sensor.add(
+                clientQuotaMetricName(metricTags),
+                new TokenBucket(),
+                getQuotaMetricConfig(metricTags)
+        );
+    }
+
+    /**
+     * Records that a user/clientId accumulated or would like to accumulate 
the provided amount at the
+     * specified time, returns throttle time in milliseconds. The quota is 
strict, meaning that it
+     * does not accept any mutations once the quota is exhausted until it gets 
back to the defined rate.
+     *
+     * @param session The session from which the user is extracted
+     * @param clientId The client id
+     * @param value The value to accumulate
+     * @param timeMs The time at which to accumulate the value
+     * @return The throttle time in milliseconds defines as the time to wait 
until the average
+     *         rate gets back to the defined quota
+     */
+    @Override
+    public int recordAndGetThrottleTimeMs(Session session, String clientId, 
double value, long timeMs) {
+        ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+        Sensor quotaSensor = clientSensors.quotaSensor();
+
+        try {
+            synchronized (quotaSensor) {
+                quotaSensor.checkQuotas(timeMs);
+                quotaSensor.record(value, timeMs, false);
+            }
+            return 0;
+        } catch (QuotaViolationException e) {
+            int throttleTimeMs = (int) throttleTimeMs(e);
+            if (log.isDebugEnabled()) {
+                log.debug("Quota violated for sensor ({}). Delay time: ({})",
+                        quotaSensor.name(), throttleTimeMs);
+            }
+            return throttleTimeMs;
+        }
+    }
+
+    /**
+     * Returns a StrictControllerMutationQuota for the given user/clientId 
pair or
+     * a UNBOUNDED_CONTROLLER_MUTATION_QUOTA if the quota is disabled.
+     *
+     * @param session The session from which the user is extracted
+     * @param clientId The client id
+     * @return ControllerMutationQuota
+     */
+    public ControllerMutationQuota newStrictQuotaFor(Session session, String 
clientId) {
+        if (quotasEnabled()) {
+            ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+            return new StrictControllerMutationQuota(time, 
clientSensors.quotaSensor());
+        } else {
+            return ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA;
+        }
+    }
+
+    public ControllerMutationQuota newStrictQuotaFor(Session session, 
RequestHeader header) {
+        return newStrictQuotaFor(session, header.clientId());
+    }
+
+    /**
+     * Returns a PermissiveControllerMutationQuota for the given user/clientId 
pair or
+     * a UNBOUNDED_CONTROLLER_MUTATION_QUOTA if the quota is disabled.
+     *
+     * @param session The session from which the user is extracted
+     * @param clientId The client id
+     * @return ControllerMutationQuota
+     */
+    public ControllerMutationQuota newPermissiveQuotaFor(Session session, 
String clientId) {
+        if (quotasEnabled()) {
+            ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+            return new PermissiveControllerMutationQuota(time, 
clientSensors.quotaSensor());
+        } else {
+            return ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA;
+        }
+    }
+
+    /**
+     * Returns a ControllerMutationQuota based on `strictSinceVersion`. It 
returns a strict
+     * quota if the version is equal to or above of the `strictSinceVersion`, 
a permissive
+     * quota if the version is below, and an unbounded quota if the quota is 
disabled.
+     * When the quota is strictly enforced. Any operation above the quota is 
not allowed
+     * and rejected with a THROTTLING_QUOTA_EXCEEDED error.
+     *
+     * @param session The session from which the user is extracted
+     * @param header The request header to extract the clientId and apiVersion 
from
+     * @param strictSinceVersion The version since quota is strict
+     * @return ControllerMutationQuota instance
+     */
+    public ControllerMutationQuota newQuotaFor(Session session, RequestHeader 
header, short strictSinceVersion) {
+        if (header.apiVersion() >= strictSinceVersion) {
+            return newStrictQuotaFor(session, header);
+        } else {
+            return newPermissiveQuotaFor(session, header.clientId());
+        }
+    }
+
+    /**
+     * This calculates the amount of time needed to bring the TokenBucket 
within quota
+     * assuming that no new metrics are recorded.
+     * Basically, if a value < 0 is observed, the time required to bring it to 
zero is
+     * -value/ refill rate (quota bound) * 1000.
+     */
+    public static long throttleTimeMs(QuotaViolationException e) {
+        if (e.metric().measurable() instanceof TokenBucket) {

Review Comment:
   ```java
           if (e.metric().measurable() instanceof TokenBucket) {
               return Math.round(-e.value() / e.bound() * 1000);
           }
           throw new IllegalArgumentException(
                   "Metric " + e.metric().metricName() + " is not a TokenBucket 
metric, value " + e.metric().measurable());
   ```



##########
server/src/main/java/org/apache/kafka/server/ControllerMutationQuotaManager.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.TokenBucket;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.QuotaType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+
+
+/**
+ * The ControllerMutationQuotaManager is a specialized ClientQuotaManager used 
in the context
+ * of throttling controller's operations/mutations.
+ */
+public class ControllerMutationQuotaManager extends ClientQuotaManager {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ControllerMutationQuotaManager.class);
+
+    /**
+     * @param config ClientQuotaManagerConfig quota configs
+     * @param metrics Metrics instance
+     * @param time Time object to use
+     * @param threadNamePrefix The thread prefix to use
+     * @param quotaCallback ClientQuotaCallback to use
+     */
+    public ControllerMutationQuotaManager(ClientQuotaManagerConfig config,
+                                          Metrics metrics,
+                                          Time time,
+                                          String threadNamePrefix,
+                                          
Optional<Plugin<ClientQuotaCallback>> quotaCallback) {
+        super(config, metrics, QuotaType.CONTROLLER_MUTATION, time, 
threadNamePrefix, quotaCallback);
+    }
+
+    @Override
+    protected MetricName clientQuotaMetricName(Map<String, String> 
quotaMetricTags) {
+        return metrics.metricName("tokens", 
QuotaType.CONTROLLER_MUTATION.toString(),
+                "Tracking remaining tokens in the token bucket per 
user/client-id",
+                quotaMetricTags);
+    }
+
+    private MetricName clientRateMetricName(Map<String, String> 
quotaMetricTags) {
+        return metrics.metricName("mutation-rate", 
QuotaType.CONTROLLER_MUTATION.toString(),
+                "Tracking mutation-rate per user/client-id",
+                quotaMetricTags);
+    }
+
+    @Override
+    protected void registerQuotaMetrics(Map<String, String> metricTags, Sensor 
sensor) {
+        sensor.add(
+                clientRateMetricName(metricTags),
+                new Rate()
+        );
+        sensor.add(
+                clientQuotaMetricName(metricTags),
+                new TokenBucket(),
+                getQuotaMetricConfig(metricTags)
+        );
+    }
+
+    /**
+     * Records that a user/clientId accumulated or would like to accumulate 
the provided amount at the
+     * specified time, returns throttle time in milliseconds. The quota is 
strict, meaning that it
+     * does not accept any mutations once the quota is exhausted until it gets 
back to the defined rate.
+     *
+     * @param session The session from which the user is extracted
+     * @param clientId The client id
+     * @param value The value to accumulate
+     * @param timeMs The time at which to accumulate the value
+     * @return The throttle time in milliseconds defines as the time to wait 
until the average
+     *         rate gets back to the defined quota
+     */
+    @Override
+    public int recordAndGetThrottleTimeMs(Session session, String clientId, 
double value, long timeMs) {
+        ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+        Sensor quotaSensor = clientSensors.quotaSensor();
+
+        try {
+            synchronized (quotaSensor) {
+                quotaSensor.checkQuotas(timeMs);
+                quotaSensor.record(value, timeMs, false);
+            }
+            return 0;
+        } catch (QuotaViolationException e) {
+            int throttleTimeMs = (int) throttleTimeMs(e);
+            if (log.isDebugEnabled()) {
+                log.debug("Quota violated for sensor ({}). Delay time: ({})",
+                        quotaSensor.name(), throttleTimeMs);
+            }
+            return throttleTimeMs;
+        }
+    }
+
+    /**
+     * Returns a StrictControllerMutationQuota for the given user/clientId 
pair or
+     * a UNBOUNDED_CONTROLLER_MUTATION_QUOTA if the quota is disabled.
+     *
+     * @param session The session from which the user is extracted
+     * @param clientId The client id
+     * @return ControllerMutationQuota
+     */
+    public ControllerMutationQuota newStrictQuotaFor(Session session, String 
clientId) {
+        if (quotasEnabled()) {
+            ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+            return new StrictControllerMutationQuota(time, 
clientSensors.quotaSensor());
+        } else {
+            return ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA;
+        }
+    }
+
+    public ControllerMutationQuota newStrictQuotaFor(Session session, 
RequestHeader header) {
+        return newStrictQuotaFor(session, header.clientId());
+    }
+
+    /**
+     * Returns a PermissiveControllerMutationQuota for the given user/clientId 
pair or
+     * a UNBOUNDED_CONTROLLER_MUTATION_QUOTA if the quota is disabled.
+     *
+     * @param session The session from which the user is extracted
+     * @param clientId The client id
+     * @return ControllerMutationQuota
+     */
+    public ControllerMutationQuota newPermissiveQuotaFor(Session session, 
String clientId) {
+        if (quotasEnabled()) {
+            ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+            return new PermissiveControllerMutationQuota(time, 
clientSensors.quotaSensor());
+        } else {
+            return ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA;
+        }
+    }
+
+    /**
+     * Returns a ControllerMutationQuota based on `strictSinceVersion`. It 
returns a strict
+     * quota if the version is equal to or above of the `strictSinceVersion`, 
a permissive
+     * quota if the version is below, and an unbounded quota if the quota is 
disabled.
+     * When the quota is strictly enforced. Any operation above the quota is 
not allowed
+     * and rejected with a THROTTLING_QUOTA_EXCEEDED error.
+     *
+     * @param session The session from which the user is extracted
+     * @param header The request header to extract the clientId and apiVersion 
from
+     * @param strictSinceVersion The version since quota is strict
+     * @return ControllerMutationQuota instance
+     */
+    public ControllerMutationQuota newQuotaFor(Session session, RequestHeader 
header, short strictSinceVersion) {
+        if (header.apiVersion() >= strictSinceVersion) {
+            return newStrictQuotaFor(session, header);
+        } else {
+            return newPermissiveQuotaFor(session, header.clientId());
+        }
+    }
+
+    /**
+     * This calculates the amount of time needed to bring the TokenBucket 
within quota
+     * assuming that no new metrics are recorded.
+     * Basically, if a value < 0 is observed, the time required to bring it to 
zero is
+     * -value/ refill rate (quota bound) * 1000.
+     */
+    public static long throttleTimeMs(QuotaViolationException e) {

Review Comment:
   it could be package-private



##########
server/src/main/java/org/apache/kafka/server/ControllerMutationQuotaManager.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.TokenBucket;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.QuotaType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+
+
+/**
+ * The ControllerMutationQuotaManager is a specialized ClientQuotaManager used 
in the context
+ * of throttling controller's operations/mutations.
+ */
+public class ControllerMutationQuotaManager extends ClientQuotaManager {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ControllerMutationQuotaManager.class);
+
+    /**
+     * @param config ClientQuotaManagerConfig quota configs
+     * @param metrics Metrics instance
+     * @param time Time object to use
+     * @param threadNamePrefix The thread prefix to use
+     * @param quotaCallback ClientQuotaCallback to use
+     */
+    public ControllerMutationQuotaManager(ClientQuotaManagerConfig config,
+                                          Metrics metrics,
+                                          Time time,
+                                          String threadNamePrefix,
+                                          
Optional<Plugin<ClientQuotaCallback>> quotaCallback) {
+        super(config, metrics, QuotaType.CONTROLLER_MUTATION, time, 
threadNamePrefix, quotaCallback);
+    }
+
+    @Override
+    protected MetricName clientQuotaMetricName(Map<String, String> 
quotaMetricTags) {
+        return metrics.metricName("tokens", 
QuotaType.CONTROLLER_MUTATION.toString(),
+                "Tracking remaining tokens in the token bucket per 
user/client-id",
+                quotaMetricTags);
+    }
+
+    private MetricName clientRateMetricName(Map<String, String> 
quotaMetricTags) {
+        return metrics.metricName("mutation-rate", 
QuotaType.CONTROLLER_MUTATION.toString(),
+                "Tracking mutation-rate per user/client-id",
+                quotaMetricTags);
+    }
+
+    @Override
+    protected void registerQuotaMetrics(Map<String, String> metricTags, Sensor 
sensor) {
+        sensor.add(
+                clientRateMetricName(metricTags),
+                new Rate()
+        );
+        sensor.add(
+                clientQuotaMetricName(metricTags),
+                new TokenBucket(),
+                getQuotaMetricConfig(metricTags)
+        );
+    }
+
+    /**
+     * Records that a user/clientId accumulated or would like to accumulate 
the provided amount at the
+     * specified time, returns throttle time in milliseconds. The quota is 
strict, meaning that it
+     * does not accept any mutations once the quota is exhausted until it gets 
back to the defined rate.
+     *
+     * @param session The session from which the user is extracted
+     * @param clientId The client id
+     * @param value The value to accumulate
+     * @param timeMs The time at which to accumulate the value
+     * @return The throttle time in milliseconds defines as the time to wait 
until the average
+     *         rate gets back to the defined quota
+     */
+    @Override
+    public int recordAndGetThrottleTimeMs(Session session, String clientId, 
double value, long timeMs) {
+        ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+        Sensor quotaSensor = clientSensors.quotaSensor();
+
+        try {
+            synchronized (quotaSensor) {
+                quotaSensor.checkQuotas(timeMs);
+                quotaSensor.record(value, timeMs, false);
+            }
+            return 0;
+        } catch (QuotaViolationException e) {
+            int throttleTimeMs = (int) throttleTimeMs(e);
+            if (log.isDebugEnabled()) {
+                log.debug("Quota violated for sensor ({}). Delay time: ({})",
+                        quotaSensor.name(), throttleTimeMs);
+            }
+            return throttleTimeMs;
+        }
+    }
+
+    /**
+     * Returns a StrictControllerMutationQuota for the given user/clientId 
pair or
+     * a UNBOUNDED_CONTROLLER_MUTATION_QUOTA if the quota is disabled.
+     *
+     * @param session The session from which the user is extracted
+     * @param clientId The client id
+     * @return ControllerMutationQuota
+     */
+    public ControllerMutationQuota newStrictQuotaFor(Session session, String 
clientId) {
+        if (quotasEnabled()) {
+            ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+            return new StrictControllerMutationQuota(time, 
clientSensors.quotaSensor());
+        } else {
+            return ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA;
+        }
+    }
+
+    public ControllerMutationQuota newStrictQuotaFor(Session session, 
RequestHeader header) {
+        return newStrictQuotaFor(session, header.clientId());
+    }
+
+    /**
+     * Returns a PermissiveControllerMutationQuota for the given user/clientId 
pair or
+     * a UNBOUNDED_CONTROLLER_MUTATION_QUOTA if the quota is disabled.
+     *
+     * @param session The session from which the user is extracted
+     * @param clientId The client id
+     * @return ControllerMutationQuota
+     */
+    public ControllerMutationQuota newPermissiveQuotaFor(Session session, 
String clientId) {
+        if (quotasEnabled()) {
+            ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+            return new PermissiveControllerMutationQuota(time, 
clientSensors.quotaSensor());
+        } else {
+            return ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA;
+        }
+    }
+
+    /**
+     * Returns a ControllerMutationQuota based on `strictSinceVersion`. It 
returns a strict
+     * quota if the version is equal to or above of the `strictSinceVersion`, 
a permissive
+     * quota if the version is below, and an unbounded quota if the quota is 
disabled.
+     * When the quota is strictly enforced. Any operation above the quota is 
not allowed
+     * and rejected with a THROTTLING_QUOTA_EXCEEDED error.
+     *
+     * @param session The session from which the user is extracted
+     * @param header The request header to extract the clientId and apiVersion 
from
+     * @param strictSinceVersion The version since quota is strict
+     * @return ControllerMutationQuota instance
+     */
+    public ControllerMutationQuota newQuotaFor(Session session, RequestHeader 
header, short strictSinceVersion) {
+        if (header.apiVersion() >= strictSinceVersion) {

Review Comment:
   ```java
           if (header.apiVersion() >= strictSinceVersion) return 
newStrictQuotaFor(session, header);
           return newPermissiveQuotaFor(session, header.clientId());
   ```



##########
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java:
##########
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Sanitizer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.ClientQuotaEntity;
+import org.apache.kafka.server.quota.ClientQuotaType;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.QuotaUtils;
+import org.apache.kafka.server.quota.SensorAccess;
+import org.apache.kafka.server.quota.ThrottleCallback;
+import org.apache.kafka.server.quota.ThrottledChannel;
+import org.apache.kafka.server.util.ShutdownableThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
+public class ClientQuotaManager {
+
+    static final class QuotaTypes {
+        static final int NO_QUOTAS = 0;
+        static final int CLIENT_ID_QUOTA_ENABLED = 1;
+        static final int USER_QUOTA_ENABLED = 2;
+        static final int USER_CLIENT_ID_QUOTA_ENABLED = 4;
+        static final int CUSTOM_QUOTAS = 8; // No metric update optimizations 
are used with custom quotas
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClientQuotaManager.class);
+
+    // Purge sensors after 1 hour of inactivity
+    private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+    private static final String DEFAULT_NAME = "<default>";
+
+    public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE);
+    public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null);
+    public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, 
DefaultClientIdEntity.INSTANCE);
+
+    public record UserEntity(String sanitizedUser) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.USER;
+        }
+
+        @Override
+        public String name() {
+            return Sanitizer.desanitize(sanitizedUser);
+        }
+
+        @Override
+        public String toString() {
+            return "user " + sanitizedUser;
+        }
+    }
+
+    // Convert to record - this is a simple data holder
+    public record ClientIdEntity(String clientId) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return clientId;
+        }
+
+        @Override
+        public String toString() {
+            return "client-id " + clientId;
+        }
+    }
+
+    // Keep as a class-uses a singleton pattern which doesn't work well with 
records
+    public static class DefaultUserEntity implements 
ClientQuotaEntity.ConfigEntity {
+        public static final DefaultUserEntity INSTANCE = new 
DefaultUserEntity();
+
+        private DefaultUserEntity() {}
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER;
+        }
+
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+
+        @Override
+        public String toString() {
+            return "default user";
+        }
+    }
+
+    public static class DefaultClientIdEntity implements 
ClientQuotaEntity.ConfigEntity {

Review Comment:
   ditto



##########
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java:
##########
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Sanitizer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.ClientQuotaEntity;
+import org.apache.kafka.server.quota.ClientQuotaType;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.QuotaUtils;
+import org.apache.kafka.server.quota.SensorAccess;
+import org.apache.kafka.server.quota.ThrottleCallback;
+import org.apache.kafka.server.quota.ThrottledChannel;
+import org.apache.kafka.server.util.ShutdownableThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
+public class ClientQuotaManager {
+
+    static final class QuotaTypes {

Review Comment:
   Do we really need a struct class?



##########
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java:
##########
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Sanitizer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.ClientQuotaEntity;
+import org.apache.kafka.server.quota.ClientQuotaType;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.QuotaUtils;
+import org.apache.kafka.server.quota.SensorAccess;
+import org.apache.kafka.server.quota.ThrottleCallback;
+import org.apache.kafka.server.quota.ThrottledChannel;
+import org.apache.kafka.server.util.ShutdownableThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
+public class ClientQuotaManager {
+
+    static final class QuotaTypes {
+        static final int NO_QUOTAS = 0;
+        static final int CLIENT_ID_QUOTA_ENABLED = 1;
+        static final int USER_QUOTA_ENABLED = 2;
+        static final int USER_CLIENT_ID_QUOTA_ENABLED = 4;
+        static final int CUSTOM_QUOTAS = 8; // No metric update optimizations 
are used with custom quotas
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClientQuotaManager.class);
+
+    // Purge sensors after 1 hour of inactivity
+    private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+    private static final String DEFAULT_NAME = "<default>";
+
+    public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE);
+    public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null);
+    public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, 
DefaultClientIdEntity.INSTANCE);
+
+    public record UserEntity(String sanitizedUser) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.USER;
+        }
+
+        @Override
+        public String name() {
+            return Sanitizer.desanitize(sanitizedUser);
+        }
+
+        @Override
+        public String toString() {
+            return "user " + sanitizedUser;
+        }
+    }
+
+    // Convert to record - this is a simple data holder
+    public record ClientIdEntity(String clientId) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return clientId;
+        }
+
+        @Override
+        public String toString() {
+            return "client-id " + clientId;
+        }
+    }
+
+    // Keep as a class-uses a singleton pattern which doesn't work well with 
records
+    public static class DefaultUserEntity implements 
ClientQuotaEntity.ConfigEntity {
+        public static final DefaultUserEntity INSTANCE = new 
DefaultUserEntity();
+
+        private DefaultUserEntity() {}
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER;
+        }
+
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+
+        @Override
+        public String toString() {
+            return "default user";
+        }
+    }
+
+    public static class DefaultClientIdEntity implements 
ClientQuotaEntity.ConfigEntity {
+        public static final DefaultClientIdEntity INSTANCE = new 
DefaultClientIdEntity();
+
+        private DefaultClientIdEntity() {}
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+
+        @Override
+        public String toString() {
+            return "default client-id";
+        }
+    }
+
+    public record KafkaQuotaEntity(ClientQuotaEntity.ConfigEntity userEntity,
+                                          ClientQuotaEntity.ConfigEntity 
clientIdEntity) implements ClientQuotaEntity {
+
+        @Override
+        public List<ConfigEntity> configEntities() {
+            List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>();
+            if (userEntity != null) {
+                entities.add(userEntity);
+            }
+            if (clientIdEntity != null) {
+                entities.add(clientIdEntity);
+            }
+            return entities;
+        }
+
+        public String sanitizedUser() {
+            if (userEntity instanceof UserEntity userRecord) {
+                return userRecord.sanitizedUser();
+            } else if (userEntity == DefaultUserEntity.INSTANCE) {
+                return DEFAULT_NAME;
+            }
+            return "";
+        }
+
+        public String clientId() {
+            return clientIdEntity != null ? clientIdEntity.name() : "";
+        }
+
+        @Override
+        public String toString() {
+            String user = userEntity != null ? userEntity.toString() : "";
+            String clientId = clientIdEntity != null ? 
clientIdEntity.toString() : "";
+            return (user + " " + clientId).trim();
+        }
+    }
+
+    public static class DefaultTags {
+        public static final String USER = "user";
+        public static final String CLIENT_ID = "client-id";
+    }
+
+    private final ClientQuotaManagerConfig config;
+    protected final Metrics metrics;
+    private final QuotaType quotaType;
+    protected final Time time;
+    private final Optional<Plugin<ClientQuotaCallback>> 
clientQuotaCallbackPlugin;
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final SensorAccess sensorAccessor;
+    private final ClientQuotaCallback quotaCallback;
+    private final ClientQuotaType clientQuotaType;
+
+    private volatile int quotaTypesEnabled;
+
+    private final Sensor delayQueueSensor;
+    private final DelayQueue<ThrottledChannel> delayQueue = new DelayQueue<>();
+    private final ThrottledChannelReaper throttledChannelReaper;
+
+    public void processThrottledChannelReaperDoWork() {
+        throttledChannelReaper.doWork();
+    }
+
+    /**
+     * Helper class that records per-client metrics. It is also responsible 
for maintaining Quota usage statistics
+     * for all clients.
+     * <p/>
+     * Quotas can be set at <user, client-id>, user or client-id levels. For a 
given client connection,
+     * the most specific quota matching the connection will be applied. For 
example, if both a <user, client-id>
+     * and a user quota match a connection, the <user, client-id> quota will 
be used. Otherwise, user quota takes
+     * precedence over client-id quota. The order of precedence is:
+     * <ul>
+     *   <li>/config/users/<user>/clients/<client-id>
+     *   <li>/config/users/<user>/clients/<default>
+     *   <li>/config/users/<user>
+     *   <li>/config/users/<default>/clients/<client-id>
+     *   <li>/config/users/<default>/clients/<default>
+     *   <li>/config/users/<default>
+     *   <li>/config/clients/<client-id>
+     *   <li>/config/clients/<default>
+     * </ul>
+     * Quota limits including defaults may be updated dynamically. The 
implementation is optimized for the case
+     * where a single level of quotas is configured.
+     * @param config the ClientQuotaManagerConfig containing quota 
configurations
+     * @param metrics the Metrics instance for recording quota-related metrics
+     * @param quotaType the quota type managed by this quota manager
+     * @param time the Time object used for time-based operations
+     * @param threadNamePrefix the thread name prefix used for internal threads
+     * @param clientQuotaCallbackPlugin optional Plugin containing a 
ClientQuotaCallback for custom quota logic
+     */
+    public ClientQuotaManager(ClientQuotaManagerConfig config,
+                              Metrics metrics,
+                              QuotaType quotaType,
+                              Time time,
+                              String threadNamePrefix,
+                              Optional<Plugin<ClientQuotaCallback>> 
clientQuotaCallbackPlugin) {
+        this.config = config;
+        this.metrics = metrics;
+        this.quotaType = quotaType;
+        this.time = time;
+        this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin;
+
+        this.sensorAccessor = new SensorAccess(lock, metrics);
+        this.clientQuotaType = QuotaType.toClientQuotaType(quotaType);
+
+        this.quotaTypesEnabled = clientQuotaCallbackPlugin.isPresent() ?
+                QuotaTypes.CUSTOM_QUOTAS : QuotaTypes.NO_QUOTAS;
+
+        this.delayQueueSensor = metrics.sensor(quotaType.toString() + 
"-delayQueue");

Review Comment:
   `toString` is redunant



##########
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java:
##########
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Sanitizer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.ClientQuotaEntity;
+import org.apache.kafka.server.quota.ClientQuotaType;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.QuotaUtils;
+import org.apache.kafka.server.quota.SensorAccess;
+import org.apache.kafka.server.quota.ThrottleCallback;
+import org.apache.kafka.server.quota.ThrottledChannel;
+import org.apache.kafka.server.util.ShutdownableThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
+public class ClientQuotaManager {
+
+    static final class QuotaTypes {
+        static final int NO_QUOTAS = 0;
+        static final int CLIENT_ID_QUOTA_ENABLED = 1;
+        static final int USER_QUOTA_ENABLED = 2;
+        static final int USER_CLIENT_ID_QUOTA_ENABLED = 4;
+        static final int CUSTOM_QUOTAS = 8; // No metric update optimizations 
are used with custom quotas
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClientQuotaManager.class);
+
+    // Purge sensors after 1 hour of inactivity
+    private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+    private static final String DEFAULT_NAME = "<default>";
+
+    public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE);
+    public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null);
+    public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, 
DefaultClientIdEntity.INSTANCE);
+
+    public record UserEntity(String sanitizedUser) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.USER;
+        }
+
+        @Override
+        public String name() {
+            return Sanitizer.desanitize(sanitizedUser);
+        }
+
+        @Override
+        public String toString() {
+            return "user " + sanitizedUser;
+        }
+    }
+
+    // Convert to record - this is a simple data holder
+    public record ClientIdEntity(String clientId) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return clientId;
+        }
+
+        @Override
+        public String toString() {
+            return "client-id " + clientId;
+        }
+    }
+
+    // Keep as a class-uses a singleton pattern which doesn't work well with 
records
+    public static class DefaultUserEntity implements 
ClientQuotaEntity.ConfigEntity {
+        public static final DefaultUserEntity INSTANCE = new 
DefaultUserEntity();
+
+        private DefaultUserEntity() {}
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER;
+        }
+
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+
+        @Override
+        public String toString() {
+            return "default user";
+        }
+    }
+
+    public static class DefaultClientIdEntity implements 
ClientQuotaEntity.ConfigEntity {
+        public static final DefaultClientIdEntity INSTANCE = new 
DefaultClientIdEntity();
+
+        private DefaultClientIdEntity() {}
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+
+        @Override
+        public String toString() {
+            return "default client-id";
+        }
+    }
+
+    public record KafkaQuotaEntity(ClientQuotaEntity.ConfigEntity userEntity,
+                                          ClientQuotaEntity.ConfigEntity 
clientIdEntity) implements ClientQuotaEntity {
+
+        @Override
+        public List<ConfigEntity> configEntities() {
+            List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>();
+            if (userEntity != null) {
+                entities.add(userEntity);
+            }
+            if (clientIdEntity != null) {
+                entities.add(clientIdEntity);
+            }
+            return entities;
+        }
+
+        public String sanitizedUser() {
+            if (userEntity instanceof UserEntity userRecord) {
+                return userRecord.sanitizedUser();
+            } else if (userEntity == DefaultUserEntity.INSTANCE) {
+                return DEFAULT_NAME;
+            }
+            return "";
+        }
+
+        public String clientId() {
+            return clientIdEntity != null ? clientIdEntity.name() : "";
+        }
+
+        @Override
+        public String toString() {
+            String user = userEntity != null ? userEntity.toString() : "";
+            String clientId = clientIdEntity != null ? 
clientIdEntity.toString() : "";
+            return (user + " " + clientId).trim();
+        }
+    }
+
+    public static class DefaultTags {
+        public static final String USER = "user";
+        public static final String CLIENT_ID = "client-id";
+    }
+
+    private final ClientQuotaManagerConfig config;
+    protected final Metrics metrics;
+    private final QuotaType quotaType;
+    protected final Time time;
+    private final Optional<Plugin<ClientQuotaCallback>> 
clientQuotaCallbackPlugin;
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final SensorAccess sensorAccessor;
+    private final ClientQuotaCallback quotaCallback;
+    private final ClientQuotaType clientQuotaType;
+
+    private volatile int quotaTypesEnabled;
+
+    private final Sensor delayQueueSensor;
+    private final DelayQueue<ThrottledChannel> delayQueue = new DelayQueue<>();
+    private final ThrottledChannelReaper throttledChannelReaper;
+
+    public void processThrottledChannelReaperDoWork() {
+        throttledChannelReaper.doWork();
+    }
+
+    /**
+     * Helper class that records per-client metrics. It is also responsible 
for maintaining Quota usage statistics
+     * for all clients.
+     * <p/>
+     * Quotas can be set at <user, client-id>, user or client-id levels. For a 
given client connection,
+     * the most specific quota matching the connection will be applied. For 
example, if both a <user, client-id>
+     * and a user quota match a connection, the <user, client-id> quota will 
be used. Otherwise, user quota takes
+     * precedence over client-id quota. The order of precedence is:
+     * <ul>
+     *   <li>/config/users/<user>/clients/<client-id>
+     *   <li>/config/users/<user>/clients/<default>
+     *   <li>/config/users/<user>
+     *   <li>/config/users/<default>/clients/<client-id>
+     *   <li>/config/users/<default>/clients/<default>
+     *   <li>/config/users/<default>
+     *   <li>/config/clients/<client-id>
+     *   <li>/config/clients/<default>
+     * </ul>
+     * Quota limits including defaults may be updated dynamically. The 
implementation is optimized for the case
+     * where a single level of quotas is configured.
+     * @param config the ClientQuotaManagerConfig containing quota 
configurations
+     * @param metrics the Metrics instance for recording quota-related metrics
+     * @param quotaType the quota type managed by this quota manager
+     * @param time the Time object used for time-based operations
+     * @param threadNamePrefix the thread name prefix used for internal threads
+     * @param clientQuotaCallbackPlugin optional Plugin containing a 
ClientQuotaCallback for custom quota logic
+     */
+    public ClientQuotaManager(ClientQuotaManagerConfig config,
+                              Metrics metrics,
+                              QuotaType quotaType,
+                              Time time,
+                              String threadNamePrefix,
+                              Optional<Plugin<ClientQuotaCallback>> 
clientQuotaCallbackPlugin) {
+        this.config = config;
+        this.metrics = metrics;
+        this.quotaType = quotaType;
+        this.time = time;
+        this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin;
+
+        this.sensorAccessor = new SensorAccess(lock, metrics);
+        this.clientQuotaType = QuotaType.toClientQuotaType(quotaType);
+
+        this.quotaTypesEnabled = clientQuotaCallbackPlugin.isPresent() ?
+                QuotaTypes.CUSTOM_QUOTAS : QuotaTypes.NO_QUOTAS;
+
+        this.delayQueueSensor = metrics.sensor(quotaType.toString() + 
"-delayQueue");
+        this.delayQueueSensor.add(metrics.metricName("queue-size", 
quotaType.toString(),
+                "Tracks the size of the delay queue"), new CumulativeSum());
+        this.throttledChannelReaper = new ThrottledChannelReaper(delayQueue, 
threadNamePrefix);
+
+        this.quotaCallback = clientQuotaCallbackPlugin
+                .map(Plugin::get)
+                .orElse(new DefaultQuotaCallback());
+
+        start(); // Use the start method to keep spotbugs happy
+    }
+
+    public ClientQuotaManager(ClientQuotaManagerConfig config,
+                              Metrics metrics,
+                              QuotaType quotaType,
+                              Time time,
+                              String threadNamePrefix) {
+        this(config, metrics, quotaType, time, threadNamePrefix, 
Optional.empty());
+    }
+
+    protected Metrics metrics() {
+        return metrics;
+    }
+    protected Time time() {
+        return time;
+    }
+
+    private void start() {
+        throttledChannelReaper.start();
+    }
+
+    /**
+     * Reaper thread that triggers channel unmute callbacks on all throttled 
channels
+     */
+    public class ThrottledChannelReaper extends ShutdownableThread {
+        private final DelayQueue<ThrottledChannel> delayQueue;
+
+        public ThrottledChannelReaper(DelayQueue<ThrottledChannel> delayQueue, 
String prefix) {
+            super(prefix + "ThrottledChannelReaper-" + quotaType, false);
+            this.delayQueue = delayQueue;
+        }
+
+        @Override
+        public void doWork() {
+            ThrottledChannel throttledChannel;
+            try {
+                throttledChannel = delayQueue.poll(1, TimeUnit.SECONDS);
+                if (throttledChannel != null) {
+                    // Decrement the size of the delay queue
+                    delayQueueSensor.record(-1);
+                    // Notify the socket server that throttling is done for 
this channel
+                    throttledChannel.notifyThrottlingDone();
+                }
+            } catch (InterruptedException e) {
+                // Ignore and continue
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Returns true if any quotas are enabled for this quota manager. This is 
used
+     * to determine if quota-related metrics should be created.
+     * Note: If any quotas (static defaults, dynamic defaults or quota 
overrides) have
+     * been configured for this broker at any time for this quota type, 
quotasEnabled will
+     * return true until the next broker restarts, even if all quotas are 
subsequently deleted.
+     */
+    public boolean quotasEnabled() {
+        return quotaTypesEnabled != QuotaTypes.NO_QUOTAS;
+    }
+
+    /**
+     * See recordAndGetThrottleTimeMs.
+     */
+    public int maybeRecordAndGetThrottleTimeMs(Session session, String 
clientId, double value, long timeMs) {
+        // Record metrics only if quotas are enabled.
+        if (quotasEnabled()) {
+            return recordAndGetThrottleTimeMs(session, clientId, value, 
timeMs);
+        } else {
+            return 0;
+        }
+    }
+
+    /**
+     * Records that a user/clientId accumulated or would like to accumulate 
the provided amount at the
+     * specified time, returns throttle time in milliseconds.
+     *
+     * @param session The session from which the user is extracted
+     * @param clientId The client id
+     * @param value The value to accumulate
+     * @param timeMs The time at which to accumulate the value
+     * @return The throttle time in milliseconds defines as the time to wait 
until the average
+     *         rate gets back to the defined quota
+     */
+    public int recordAndGetThrottleTimeMs(Session session, String clientId, 
double value, long timeMs) {
+        ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+        try {
+            clientSensors.quotaSensor().record(value, timeMs, true);
+            return 0;
+        } catch (QuotaViolationException e) {
+            int throttleTimeMs = (int) throttleTime(e, timeMs);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Quota violated for sensor ({}). Delay time: ({})",
+                        clientSensors.quotaSensor().name(), throttleTimeMs);
+            }
+            return throttleTimeMs;
+        }
+    }
+
+    /**
+     * Records that a user/clientId changed some metric being throttled 
without checking for
+     * quota violation. The aggregate value will subsequently be used for 
throttling when the
+     * next request is processed.
+     */
+    public void recordNoThrottle(Session session, String clientId, double 
value) {
+        ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+        clientSensors.quotaSensor().record(value, time.milliseconds(), false);
+    }
+
+    /**
+     * "Unrecord" the given value that has already been recorded for the given 
user/client by recording a negative value
+     * of the same quantity.
+     * For a throttled fetch, the broker should return an empty response and 
thus should not record the value. Ideally,
+     * we would like to compute the throttle time before actually recording 
the value, but the current Sensor code
+     * couples value recording and quota checking very tightly. As a 
workaround, we will unrecord the value for the fetch
+     * in case of throttling. Rate keeps the sum of values that fall in each 
time window, so this should bring the
+     * overall sum back to the previous value.
+     */
+    public void unrecordQuotaSensor(Session session, String clientId, double 
value, long timeMs) {
+        ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+        clientSensors.quotaSensor().record(value * -1, timeMs, false);
+    }
+
+    /**
+     * Returns the maximum value that could be recorded without guaranteed 
throttling.
+     * Recording any larger value will always be throttled, even if no other 
values were recorded in the quota window.
+     * This is used for deciding the maximum bytes that can be fetched at once
+     */
+    public double getMaxValueInQuotaWindow(Session session, String clientId) {

Review Comment:
   could you please remove the prefix `get`



##########
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java:
##########
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Sanitizer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.ClientQuotaEntity;
+import org.apache.kafka.server.quota.ClientQuotaType;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.QuotaUtils;
+import org.apache.kafka.server.quota.SensorAccess;
+import org.apache.kafka.server.quota.ThrottleCallback;
+import org.apache.kafka.server.quota.ThrottledChannel;
+import org.apache.kafka.server.util.ShutdownableThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
+public class ClientQuotaManager {
+
+    static final class QuotaTypes {
+        static final int NO_QUOTAS = 0;
+        static final int CLIENT_ID_QUOTA_ENABLED = 1;
+        static final int USER_QUOTA_ENABLED = 2;
+        static final int USER_CLIENT_ID_QUOTA_ENABLED = 4;
+        static final int CUSTOM_QUOTAS = 8; // No metric update optimizations 
are used with custom quotas
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClientQuotaManager.class);
+
+    // Purge sensors after 1 hour of inactivity
+    private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+    private static final String DEFAULT_NAME = "<default>";
+
+    public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE);
+    public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null);
+    public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, 
DefaultClientIdEntity.INSTANCE);
+
+    public record UserEntity(String sanitizedUser) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.USER;
+        }
+
+        @Override
+        public String name() {
+            return Sanitizer.desanitize(sanitizedUser);
+        }
+
+        @Override
+        public String toString() {
+            return "user " + sanitizedUser;
+        }
+    }
+
+    // Convert to record - this is a simple data holder
+    public record ClientIdEntity(String clientId) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return clientId;
+        }
+
+        @Override
+        public String toString() {
+            return "client-id " + clientId;
+        }
+    }
+
+    // Keep as a class-uses a singleton pattern which doesn't work well with 
records
+    public static class DefaultUserEntity implements 
ClientQuotaEntity.ConfigEntity {
+        public static final DefaultUserEntity INSTANCE = new 
DefaultUserEntity();
+
+        private DefaultUserEntity() {}
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER;
+        }
+
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+
+        @Override
+        public String toString() {
+            return "default user";
+        }
+    }
+
+    public static class DefaultClientIdEntity implements 
ClientQuotaEntity.ConfigEntity {
+        public static final DefaultClientIdEntity INSTANCE = new 
DefaultClientIdEntity();
+
+        private DefaultClientIdEntity() {}
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+
+        @Override
+        public String toString() {
+            return "default client-id";
+        }
+    }
+
+    public record KafkaQuotaEntity(ClientQuotaEntity.ConfigEntity userEntity,
+                                          ClientQuotaEntity.ConfigEntity 
clientIdEntity) implements ClientQuotaEntity {
+
+        @Override
+        public List<ConfigEntity> configEntities() {
+            List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>();
+            if (userEntity != null) {
+                entities.add(userEntity);
+            }
+            if (clientIdEntity != null) {
+                entities.add(clientIdEntity);
+            }
+            return entities;
+        }
+
+        public String sanitizedUser() {
+            if (userEntity instanceof UserEntity userRecord) {
+                return userRecord.sanitizedUser();
+            } else if (userEntity == DefaultUserEntity.INSTANCE) {
+                return DEFAULT_NAME;
+            }
+            return "";
+        }
+
+        public String clientId() {
+            return clientIdEntity != null ? clientIdEntity.name() : "";
+        }
+
+        @Override
+        public String toString() {
+            String user = userEntity != null ? userEntity.toString() : "";
+            String clientId = clientIdEntity != null ? 
clientIdEntity.toString() : "";
+            return (user + " " + clientId).trim();
+        }
+    }
+
+    public static class DefaultTags {
+        public static final String USER = "user";
+        public static final String CLIENT_ID = "client-id";
+    }
+
+    private final ClientQuotaManagerConfig config;
+    protected final Metrics metrics;
+    private final QuotaType quotaType;
+    protected final Time time;
+    private final Optional<Plugin<ClientQuotaCallback>> 
clientQuotaCallbackPlugin;
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final SensorAccess sensorAccessor;
+    private final ClientQuotaCallback quotaCallback;
+    private final ClientQuotaType clientQuotaType;
+
+    private volatile int quotaTypesEnabled;
+
+    private final Sensor delayQueueSensor;
+    private final DelayQueue<ThrottledChannel> delayQueue = new DelayQueue<>();
+    private final ThrottledChannelReaper throttledChannelReaper;
+
+    public void processThrottledChannelReaperDoWork() {
+        throttledChannelReaper.doWork();
+    }
+
+    /**
+     * Helper class that records per-client metrics. It is also responsible 
for maintaining Quota usage statistics
+     * for all clients.
+     * <p/>
+     * Quotas can be set at <user, client-id>, user or client-id levels. For a 
given client connection,
+     * the most specific quota matching the connection will be applied. For 
example, if both a <user, client-id>
+     * and a user quota match a connection, the <user, client-id> quota will 
be used. Otherwise, user quota takes
+     * precedence over client-id quota. The order of precedence is:
+     * <ul>
+     *   <li>/config/users/<user>/clients/<client-id>
+     *   <li>/config/users/<user>/clients/<default>
+     *   <li>/config/users/<user>
+     *   <li>/config/users/<default>/clients/<client-id>
+     *   <li>/config/users/<default>/clients/<default>
+     *   <li>/config/users/<default>
+     *   <li>/config/clients/<client-id>
+     *   <li>/config/clients/<default>
+     * </ul>
+     * Quota limits including defaults may be updated dynamically. The 
implementation is optimized for the case
+     * where a single level of quotas is configured.
+     * @param config the ClientQuotaManagerConfig containing quota 
configurations
+     * @param metrics the Metrics instance for recording quota-related metrics
+     * @param quotaType the quota type managed by this quota manager
+     * @param time the Time object used for time-based operations
+     * @param threadNamePrefix the thread name prefix used for internal threads
+     * @param clientQuotaCallbackPlugin optional Plugin containing a 
ClientQuotaCallback for custom quota logic
+     */
+    public ClientQuotaManager(ClientQuotaManagerConfig config,
+                              Metrics metrics,
+                              QuotaType quotaType,
+                              Time time,
+                              String threadNamePrefix,
+                              Optional<Plugin<ClientQuotaCallback>> 
clientQuotaCallbackPlugin) {
+        this.config = config;
+        this.metrics = metrics;
+        this.quotaType = quotaType;
+        this.time = time;
+        this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin;
+
+        this.sensorAccessor = new SensorAccess(lock, metrics);
+        this.clientQuotaType = QuotaType.toClientQuotaType(quotaType);
+
+        this.quotaTypesEnabled = clientQuotaCallbackPlugin.isPresent() ?
+                QuotaTypes.CUSTOM_QUOTAS : QuotaTypes.NO_QUOTAS;
+
+        this.delayQueueSensor = metrics.sensor(quotaType.toString() + 
"-delayQueue");
+        this.delayQueueSensor.add(metrics.metricName("queue-size", 
quotaType.toString(),
+                "Tracks the size of the delay queue"), new CumulativeSum());
+        this.throttledChannelReaper = new ThrottledChannelReaper(delayQueue, 
threadNamePrefix);
+
+        this.quotaCallback = clientQuotaCallbackPlugin
+                .map(Plugin::get)
+                .orElse(new DefaultQuotaCallback());
+
+        start(); // Use the start method to keep spotbugs happy
+    }
+
+    public ClientQuotaManager(ClientQuotaManagerConfig config,
+                              Metrics metrics,
+                              QuotaType quotaType,
+                              Time time,
+                              String threadNamePrefix) {
+        this(config, metrics, quotaType, time, threadNamePrefix, 
Optional.empty());
+    }
+
+    protected Metrics metrics() {
+        return metrics;
+    }
+    protected Time time() {
+        return time;
+    }
+
+    private void start() {
+        throttledChannelReaper.start();
+    }
+
+    /**
+     * Reaper thread that triggers channel unmute callbacks on all throttled 
channels
+     */
+    public class ThrottledChannelReaper extends ShutdownableThread {
+        private final DelayQueue<ThrottledChannel> delayQueue;
+
+        public ThrottledChannelReaper(DelayQueue<ThrottledChannel> delayQueue, 
String prefix) {
+            super(prefix + "ThrottledChannelReaper-" + quotaType, false);
+            this.delayQueue = delayQueue;
+        }
+
+        @Override
+        public void doWork() {
+            ThrottledChannel throttledChannel;
+            try {
+                throttledChannel = delayQueue.poll(1, TimeUnit.SECONDS);
+                if (throttledChannel != null) {
+                    // Decrement the size of the delay queue
+                    delayQueueSensor.record(-1);
+                    // Notify the socket server that throttling is done for 
this channel
+                    throttledChannel.notifyThrottlingDone();
+                }
+            } catch (InterruptedException e) {
+                // Ignore and continue
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Returns true if any quotas are enabled for this quota manager. This is 
used
+     * to determine if quota-related metrics should be created.
+     * Note: If any quotas (static defaults, dynamic defaults or quota 
overrides) have
+     * been configured for this broker at any time for this quota type, 
quotasEnabled will
+     * return true until the next broker restarts, even if all quotas are 
subsequently deleted.
+     */
+    public boolean quotasEnabled() {
+        return quotaTypesEnabled != QuotaTypes.NO_QUOTAS;
+    }
+
+    /**
+     * See recordAndGetThrottleTimeMs.
+     */
+    public int maybeRecordAndGetThrottleTimeMs(Session session, String 
clientId, double value, long timeMs) {
+        // Record metrics only if quotas are enabled.
+        if (quotasEnabled()) {
+            return recordAndGetThrottleTimeMs(session, clientId, value, 
timeMs);
+        } else {
+            return 0;
+        }
+    }
+
+    /**
+     * Records that a user/clientId accumulated or would like to accumulate 
the provided amount at the
+     * specified time, returns throttle time in milliseconds.
+     *
+     * @param session The session from which the user is extracted
+     * @param clientId The client id
+     * @param value The value to accumulate
+     * @param timeMs The time at which to accumulate the value
+     * @return The throttle time in milliseconds defines as the time to wait 
until the average
+     *         rate gets back to the defined quota
+     */
+    public int recordAndGetThrottleTimeMs(Session session, String clientId, 
double value, long timeMs) {
+        ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+        try {
+            clientSensors.quotaSensor().record(value, timeMs, true);
+            return 0;
+        } catch (QuotaViolationException e) {
+            int throttleTimeMs = (int) throttleTime(e, timeMs);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Quota violated for sensor ({}). Delay time: ({})",
+                        clientSensors.quotaSensor().name(), throttleTimeMs);
+            }
+            return throttleTimeMs;
+        }
+    }
+
+    /**
+     * Records that a user/clientId changed some metric being throttled 
without checking for
+     * quota violation. The aggregate value will subsequently be used for 
throttling when the
+     * next request is processed.
+     */
+    public void recordNoThrottle(Session session, String clientId, double 
value) {
+        ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+        clientSensors.quotaSensor().record(value, time.milliseconds(), false);
+    }
+
+    /**
+     * "Unrecord" the given value that has already been recorded for the given 
user/client by recording a negative value
+     * of the same quantity.
+     * For a throttled fetch, the broker should return an empty response and 
thus should not record the value. Ideally,
+     * we would like to compute the throttle time before actually recording 
the value, but the current Sensor code
+     * couples value recording and quota checking very tightly. As a 
workaround, we will unrecord the value for the fetch
+     * in case of throttling. Rate keeps the sum of values that fall in each 
time window, so this should bring the
+     * overall sum back to the previous value.
+     */
+    public void unrecordQuotaSensor(Session session, String clientId, double 
value, long timeMs) {
+        ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+        clientSensors.quotaSensor().record(value * -1, timeMs, false);
+    }
+
+    /**
+     * Returns the maximum value that could be recorded without guaranteed 
throttling.
+     * Recording any larger value will always be throttled, even if no other 
values were recorded in the quota window.
+     * This is used for deciding the maximum bytes that can be fetched at once
+     */
+    public double getMaxValueInQuotaWindow(Session session, String clientId) {
+        if (quotasEnabled()) {

Review Comment:
   ```java
           if (!quotasEnabled()) return Double.MAX_VALUE;
           var clientSensors = getOrCreateQuotaSensors(session, clientId);
           var limit = quotaCallback.quotaLimit(clientQuotaType, 
clientSensors.metricTags());
           if (limit != null) return limit * (config.numQuotaSamples - 1) * 
config.quotaWindowSizeSeconds;
           return Double.MAX_VALUE;
   ```



##########
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java:
##########
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Sanitizer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.ClientQuotaEntity;
+import org.apache.kafka.server.quota.ClientQuotaType;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.QuotaUtils;
+import org.apache.kafka.server.quota.SensorAccess;
+import org.apache.kafka.server.quota.ThrottleCallback;
+import org.apache.kafka.server.quota.ThrottledChannel;
+import org.apache.kafka.server.util.ShutdownableThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
+public class ClientQuotaManager {
+
+    static final class QuotaTypes {
+        static final int NO_QUOTAS = 0;
+        static final int CLIENT_ID_QUOTA_ENABLED = 1;
+        static final int USER_QUOTA_ENABLED = 2;
+        static final int USER_CLIENT_ID_QUOTA_ENABLED = 4;
+        static final int CUSTOM_QUOTAS = 8; // No metric update optimizations 
are used with custom quotas
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClientQuotaManager.class);
+
+    // Purge sensors after 1 hour of inactivity
+    private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+    private static final String DEFAULT_NAME = "<default>";
+
+    public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE);
+    public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null);
+    public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, 
DefaultClientIdEntity.INSTANCE);
+
+    public record UserEntity(String sanitizedUser) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.USER;
+        }
+
+        @Override
+        public String name() {
+            return Sanitizer.desanitize(sanitizedUser);
+        }
+
+        @Override
+        public String toString() {
+            return "user " + sanitizedUser;
+        }
+    }
+
+    // Convert to record - this is a simple data holder
+    public record ClientIdEntity(String clientId) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return clientId;
+        }
+
+        @Override
+        public String toString() {
+            return "client-id " + clientId;
+        }
+    }
+
+    // Keep as a class-uses a singleton pattern which doesn't work well with 
records
+    public static class DefaultUserEntity implements 
ClientQuotaEntity.ConfigEntity {
+        public static final DefaultUserEntity INSTANCE = new 
DefaultUserEntity();
+
+        private DefaultUserEntity() {}
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER;
+        }
+
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+
+        @Override
+        public String toString() {
+            return "default user";
+        }
+    }
+
+    public static class DefaultClientIdEntity implements 
ClientQuotaEntity.ConfigEntity {
+        public static final DefaultClientIdEntity INSTANCE = new 
DefaultClientIdEntity();
+
+        private DefaultClientIdEntity() {}
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+
+        @Override
+        public String toString() {
+            return "default client-id";
+        }
+    }
+
+    public record KafkaQuotaEntity(ClientQuotaEntity.ConfigEntity userEntity,
+                                          ClientQuotaEntity.ConfigEntity 
clientIdEntity) implements ClientQuotaEntity {
+
+        @Override
+        public List<ConfigEntity> configEntities() {
+            List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>();
+            if (userEntity != null) {
+                entities.add(userEntity);
+            }
+            if (clientIdEntity != null) {
+                entities.add(clientIdEntity);
+            }
+            return entities;
+        }
+
+        public String sanitizedUser() {
+            if (userEntity instanceof UserEntity userRecord) {
+                return userRecord.sanitizedUser();
+            } else if (userEntity == DefaultUserEntity.INSTANCE) {
+                return DEFAULT_NAME;
+            }
+            return "";
+        }
+
+        public String clientId() {
+            return clientIdEntity != null ? clientIdEntity.name() : "";
+        }
+
+        @Override
+        public String toString() {
+            String user = userEntity != null ? userEntity.toString() : "";
+            String clientId = clientIdEntity != null ? 
clientIdEntity.toString() : "";
+            return (user + " " + clientId).trim();
+        }
+    }
+
+    public static class DefaultTags {

Review Comment:
   ditto



##########
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java:
##########
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Sanitizer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.ClientQuotaEntity;
+import org.apache.kafka.server.quota.ClientQuotaType;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.QuotaUtils;
+import org.apache.kafka.server.quota.SensorAccess;
+import org.apache.kafka.server.quota.ThrottleCallback;
+import org.apache.kafka.server.quota.ThrottledChannel;
+import org.apache.kafka.server.util.ShutdownableThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
+public class ClientQuotaManager {
+
+    static final class QuotaTypes {
+        static final int NO_QUOTAS = 0;
+        static final int CLIENT_ID_QUOTA_ENABLED = 1;
+        static final int USER_QUOTA_ENABLED = 2;
+        static final int USER_CLIENT_ID_QUOTA_ENABLED = 4;
+        static final int CUSTOM_QUOTAS = 8; // No metric update optimizations 
are used with custom quotas
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClientQuotaManager.class);
+
+    // Purge sensors after 1 hour of inactivity
+    private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+    private static final String DEFAULT_NAME = "<default>";
+
+    public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE);
+    public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null);
+    public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, 
DefaultClientIdEntity.INSTANCE);
+
+    public record UserEntity(String sanitizedUser) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.USER;
+        }
+
+        @Override
+        public String name() {
+            return Sanitizer.desanitize(sanitizedUser);
+        }
+
+        @Override
+        public String toString() {
+            return "user " + sanitizedUser;
+        }
+    }
+
+    // Convert to record - this is a simple data holder
+    public record ClientIdEntity(String clientId) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return clientId;
+        }
+
+        @Override
+        public String toString() {
+            return "client-id " + clientId;
+        }
+    }
+
+    // Keep as a class-uses a singleton pattern which doesn't work well with 
records
+    public static class DefaultUserEntity implements 
ClientQuotaEntity.ConfigEntity {

Review Comment:
   ```java
       public static final ClientQuotaEntity.ConfigEntity DEFAULT_USER_ENTITY = 
new ClientQuotaEntity.ConfigEntity() {
           @Override
           public ClientQuotaEntity.ConfigEntityType entityType() {
               return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER;
           }
   
           @Override
           public String name() {
               return DEFAULT_NAME;
           }
   
           @Override
           public String toString() {
               return "default user";
           }
       };
   ```



##########
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java:
##########
@@ -0,0 +1,942 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Sanitizer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.ClientQuotaEntity;
+import org.apache.kafka.server.quota.ClientQuotaType;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.QuotaUtils;
+import org.apache.kafka.server.quota.SensorAccess;
+import org.apache.kafka.server.quota.ThrottleCallback;
+import org.apache.kafka.server.quota.ThrottledChannel;
+import org.apache.kafka.server.util.ShutdownableThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
+final class QuotaTypes {
+    static final int NO_QUOTAS = 0;
+    static final int CLIENT_ID_QUOTA_ENABLED = 1;
+    static final int USER_QUOTA_ENABLED = 2;
+    static final int USER_CLIENT_ID_QUOTA_ENABLED = 4;
+    static final int CUSTOM_QUOTAS = 8; // No metric update optimizations are 
used with custom quotas
+}
+
+public class ClientQuotaManager {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ClientQuotaManager.class);
+
+    // Purge sensors after 1 hour of inactivity
+    public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+    private static final String DEFAULT_NAME = "<default>";
+
+    public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE);
+    public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null);
+    public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, 
DefaultClientIdEntity.INSTANCE);
+
+    public interface BaseUserEntity extends ClientQuotaEntity.ConfigEntity { }
+
+    public static class UserEntity implements BaseUserEntity {
+        private final String sanitizedUser;
+
+        public UserEntity(String sanitizedUser) {
+            this.sanitizedUser = sanitizedUser;
+        }
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.USER;
+        }
+
+        @Override
+        public String name() {
+            return Sanitizer.desanitize(sanitizedUser);
+        }
+
+        public String getSanitizedUser() {
+            return sanitizedUser;
+        }
+
+        @Override
+        public String toString() {
+            return "user " + sanitizedUser;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) return true;
+            if (obj == null || getClass() != obj.getClass()) return false;
+            UserEntity that = (UserEntity) obj;
+            return sanitizedUser.equals(that.sanitizedUser);
+        }
+
+        @Override
+        public int hashCode() {
+            return sanitizedUser.hashCode();
+        }
+    }
+
+    public static class ClientIdEntity implements 
ClientQuotaEntity.ConfigEntity {
+        private final String clientId;
+
+        public ClientIdEntity(String clientId) {
+            this.clientId = clientId;
+        }
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return clientId;
+        }
+
+        public String getClientId() {
+            return clientId;
+        }
+
+        @Override
+        public String toString() {
+            return "client-id " + clientId;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) return true;
+            if (obj == null || getClass() != obj.getClass()) return false;
+            ClientIdEntity that = (ClientIdEntity) obj;
+            return clientId.equals(that.clientId);
+        }
+
+        @Override
+        public int hashCode() {
+            return clientId.hashCode();
+        }
+    }
+
+    public static class DefaultUserEntity implements BaseUserEntity {
+        public static final DefaultUserEntity INSTANCE = new 
DefaultUserEntity();
+
+        private DefaultUserEntity() {}
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER;
+        }
+
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+
+        @Override
+        public String toString() {
+            return "default user";
+        }
+    }
+
+    public static class DefaultClientIdEntity implements 
ClientQuotaEntity.ConfigEntity {
+        public static final DefaultClientIdEntity INSTANCE = new 
DefaultClientIdEntity();
+
+        private DefaultClientIdEntity() {}
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+
+        @Override
+        public String toString() {
+            return "default client-id";
+        }
+    }
+
+    public static class KafkaQuotaEntity implements ClientQuotaEntity {
+        private final BaseUserEntity userEntity;
+        private final ClientQuotaEntity.ConfigEntity clientIdEntity;
+
+        public KafkaQuotaEntity(BaseUserEntity userEntity, 
ClientQuotaEntity.ConfigEntity clientIdEntity) {
+            this.userEntity = userEntity;
+            this.clientIdEntity = clientIdEntity;
+        }
+
+        @Override
+        public List<ConfigEntity> configEntities() {
+            List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>();
+            if (userEntity != null) {
+                entities.add(userEntity);
+            }
+            if (clientIdEntity != null) {
+                entities.add(clientIdEntity);
+            }
+            return entities;
+        }
+
+        public String sanitizedUser() {
+            if (userEntity instanceof UserEntity) {
+                return ((UserEntity) userEntity).getSanitizedUser();
+            } else if (userEntity == DefaultUserEntity.INSTANCE) {
+                return DEFAULT_NAME;
+            }
+            return "";
+        }
+
+        public String clientId() {
+            return clientIdEntity != null ? clientIdEntity.name() : "";
+        }
+
+        @Override
+        public String toString() {
+            String user = userEntity != null ? userEntity.toString() : "";
+            String clientId = clientIdEntity != null ? 
clientIdEntity.toString() : "";
+            return (user + " " + clientId).trim();
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) return true;
+            if (obj == null || getClass() != obj.getClass()) return false;
+            KafkaQuotaEntity that = (KafkaQuotaEntity) obj;
+            return java.util.Objects.equals(userEntity, that.userEntity) &&
+                    java.util.Objects.equals(clientIdEntity, 
that.clientIdEntity);
+        }
+
+        @Override
+        public int hashCode() {
+            return java.util.Objects.hash(userEntity, clientIdEntity);
+        }
+    }
+
+    public static class DefaultTags {
+        public static final String USER = "user";
+        public static final String CLIENT_ID = "client-id";
+    }
+
+    private final ClientQuotaManagerConfig config;
+    protected final Metrics metrics;
+    private final QuotaType quotaType;
+    protected final Time time;
+    private final Optional<Plugin<ClientQuotaCallback>> 
clientQuotaCallbackPlugin;

Review Comment:
   I assume you will remove this variable, but you don't. Could you please 
share the reason with me?



##########
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java:
##########
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Sanitizer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.ClientQuotaEntity;
+import org.apache.kafka.server.quota.ClientQuotaType;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.QuotaUtils;
+import org.apache.kafka.server.quota.SensorAccess;
+import org.apache.kafka.server.quota.ThrottleCallback;
+import org.apache.kafka.server.quota.ThrottledChannel;
+import org.apache.kafka.server.util.ShutdownableThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
+public class ClientQuotaManager {
+
+    static final class QuotaTypes {
+        static final int NO_QUOTAS = 0;
+        static final int CLIENT_ID_QUOTA_ENABLED = 1;
+        static final int USER_QUOTA_ENABLED = 2;
+        static final int USER_CLIENT_ID_QUOTA_ENABLED = 4;
+        static final int CUSTOM_QUOTAS = 8; // No metric update optimizations 
are used with custom quotas
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClientQuotaManager.class);
+
+    // Purge sensors after 1 hour of inactivity
+    private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+    private static final String DEFAULT_NAME = "<default>";
+
+    public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE);
+    public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null);
+    public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, 
DefaultClientIdEntity.INSTANCE);
+
+    public record UserEntity(String sanitizedUser) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.USER;
+        }
+
+        @Override
+        public String name() {
+            return Sanitizer.desanitize(sanitizedUser);
+        }
+
+        @Override
+        public String toString() {
+            return "user " + sanitizedUser;
+        }
+    }
+
+    // Convert to record - this is a simple data holder
+    public record ClientIdEntity(String clientId) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return clientId;
+        }
+
+        @Override
+        public String toString() {
+            return "client-id " + clientId;
+        }
+    }
+
+    // Keep as a class-uses a singleton pattern which doesn't work well with 
records
+    public static class DefaultUserEntity implements 
ClientQuotaEntity.ConfigEntity {
+        public static final DefaultUserEntity INSTANCE = new 
DefaultUserEntity();
+
+        private DefaultUserEntity() {}
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER;
+        }
+
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+
+        @Override
+        public String toString() {
+            return "default user";
+        }
+    }
+
+    public static class DefaultClientIdEntity implements 
ClientQuotaEntity.ConfigEntity {
+        public static final DefaultClientIdEntity INSTANCE = new 
DefaultClientIdEntity();
+
+        private DefaultClientIdEntity() {}
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+
+        @Override
+        public String toString() {
+            return "default client-id";
+        }
+    }
+
+    public record KafkaQuotaEntity(ClientQuotaEntity.ConfigEntity userEntity,
+                                          ClientQuotaEntity.ConfigEntity 
clientIdEntity) implements ClientQuotaEntity {
+
+        @Override
+        public List<ConfigEntity> configEntities() {
+            List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>();
+            if (userEntity != null) {
+                entities.add(userEntity);
+            }
+            if (clientIdEntity != null) {
+                entities.add(clientIdEntity);
+            }
+            return entities;
+        }
+
+        public String sanitizedUser() {
+            if (userEntity instanceof UserEntity userRecord) {
+                return userRecord.sanitizedUser();
+            } else if (userEntity == DefaultUserEntity.INSTANCE) {
+                return DEFAULT_NAME;
+            }
+            return "";
+        }
+
+        public String clientId() {
+            return clientIdEntity != null ? clientIdEntity.name() : "";
+        }
+
+        @Override
+        public String toString() {
+            String user = userEntity != null ? userEntity.toString() : "";
+            String clientId = clientIdEntity != null ? 
clientIdEntity.toString() : "";
+            return (user + " " + clientId).trim();
+        }
+    }
+
+    public static class DefaultTags {
+        public static final String USER = "user";
+        public static final String CLIENT_ID = "client-id";
+    }
+
+    private final ClientQuotaManagerConfig config;
+    protected final Metrics metrics;
+    private final QuotaType quotaType;
+    protected final Time time;
+    private final Optional<Plugin<ClientQuotaCallback>> 
clientQuotaCallbackPlugin;
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final SensorAccess sensorAccessor;
+    private final ClientQuotaCallback quotaCallback;
+    private final ClientQuotaType clientQuotaType;
+
+    private volatile int quotaTypesEnabled;
+
+    private final Sensor delayQueueSensor;
+    private final DelayQueue<ThrottledChannel> delayQueue = new DelayQueue<>();
+    private final ThrottledChannelReaper throttledChannelReaper;
+
+    public void processThrottledChannelReaperDoWork() {
+        throttledChannelReaper.doWork();
+    }
+
+    /**
+     * Helper class that records per-client metrics. It is also responsible 
for maintaining Quota usage statistics
+     * for all clients.
+     * <p/>
+     * Quotas can be set at <user, client-id>, user or client-id levels. For a 
given client connection,
+     * the most specific quota matching the connection will be applied. For 
example, if both a <user, client-id>
+     * and a user quota match a connection, the <user, client-id> quota will 
be used. Otherwise, user quota takes
+     * precedence over client-id quota. The order of precedence is:
+     * <ul>
+     *   <li>/config/users/<user>/clients/<client-id>
+     *   <li>/config/users/<user>/clients/<default>
+     *   <li>/config/users/<user>
+     *   <li>/config/users/<default>/clients/<client-id>
+     *   <li>/config/users/<default>/clients/<default>
+     *   <li>/config/users/<default>
+     *   <li>/config/clients/<client-id>
+     *   <li>/config/clients/<default>
+     * </ul>
+     * Quota limits including defaults may be updated dynamically. The 
implementation is optimized for the case
+     * where a single level of quotas is configured.
+     * @param config the ClientQuotaManagerConfig containing quota 
configurations
+     * @param metrics the Metrics instance for recording quota-related metrics
+     * @param quotaType the quota type managed by this quota manager
+     * @param time the Time object used for time-based operations
+     * @param threadNamePrefix the thread name prefix used for internal threads
+     * @param clientQuotaCallbackPlugin optional Plugin containing a 
ClientQuotaCallback for custom quota logic
+     */
+    public ClientQuotaManager(ClientQuotaManagerConfig config,
+                              Metrics metrics,
+                              QuotaType quotaType,
+                              Time time,
+                              String threadNamePrefix,
+                              Optional<Plugin<ClientQuotaCallback>> 
clientQuotaCallbackPlugin) {
+        this.config = config;
+        this.metrics = metrics;
+        this.quotaType = quotaType;
+        this.time = time;
+        this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin;
+
+        this.sensorAccessor = new SensorAccess(lock, metrics);
+        this.clientQuotaType = QuotaType.toClientQuotaType(quotaType);
+
+        this.quotaTypesEnabled = clientQuotaCallbackPlugin.isPresent() ?
+                QuotaTypes.CUSTOM_QUOTAS : QuotaTypes.NO_QUOTAS;
+
+        this.delayQueueSensor = metrics.sensor(quotaType.toString() + 
"-delayQueue");
+        this.delayQueueSensor.add(metrics.metricName("queue-size", 
quotaType.toString(),
+                "Tracks the size of the delay queue"), new CumulativeSum());
+        this.throttledChannelReaper = new ThrottledChannelReaper(delayQueue, 
threadNamePrefix);
+
+        this.quotaCallback = clientQuotaCallbackPlugin
+                .map(Plugin::get)
+                .orElse(new DefaultQuotaCallback());
+
+        start(); // Use the start method to keep spotbugs happy
+    }
+
+    public ClientQuotaManager(ClientQuotaManagerConfig config,
+                              Metrics metrics,
+                              QuotaType quotaType,
+                              Time time,
+                              String threadNamePrefix) {
+        this(config, metrics, quotaType, time, threadNamePrefix, 
Optional.empty());
+    }
+
+    protected Metrics metrics() {
+        return metrics;
+    }
+    protected Time time() {
+        return time;
+    }
+
+    private void start() {
+        throttledChannelReaper.start();

Review Comment:
   this method can be inlined to constructor



##########
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java:
##########
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Sanitizer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.ClientQuotaEntity;
+import org.apache.kafka.server.quota.ClientQuotaType;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.QuotaUtils;
+import org.apache.kafka.server.quota.SensorAccess;
+import org.apache.kafka.server.quota.ThrottleCallback;
+import org.apache.kafka.server.quota.ThrottledChannel;
+import org.apache.kafka.server.util.ShutdownableThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
+public class ClientQuotaManager {
+
+    static final class QuotaTypes {
+        static final int NO_QUOTAS = 0;
+        static final int CLIENT_ID_QUOTA_ENABLED = 1;
+        static final int USER_QUOTA_ENABLED = 2;
+        static final int USER_CLIENT_ID_QUOTA_ENABLED = 4;
+        static final int CUSTOM_QUOTAS = 8; // No metric update optimizations 
are used with custom quotas
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClientQuotaManager.class);
+
+    // Purge sensors after 1 hour of inactivity
+    private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+    private static final String DEFAULT_NAME = "<default>";
+
+    public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE);
+    public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null);
+    public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, 
DefaultClientIdEntity.INSTANCE);
+
+    public record UserEntity(String sanitizedUser) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.USER;
+        }
+
+        @Override
+        public String name() {
+            return Sanitizer.desanitize(sanitizedUser);
+        }
+
+        @Override
+        public String toString() {
+            return "user " + sanitizedUser;
+        }
+    }
+
+    // Convert to record - this is a simple data holder
+    public record ClientIdEntity(String clientId) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return clientId;
+        }
+
+        @Override
+        public String toString() {
+            return "client-id " + clientId;
+        }
+    }
+
+    // Keep as a class-uses a singleton pattern which doesn't work well with 
records
+    public static class DefaultUserEntity implements 
ClientQuotaEntity.ConfigEntity {
+        public static final DefaultUserEntity INSTANCE = new 
DefaultUserEntity();
+
+        private DefaultUserEntity() {}
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER;
+        }
+
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+
+        @Override
+        public String toString() {
+            return "default user";
+        }
+    }
+
+    public static class DefaultClientIdEntity implements 
ClientQuotaEntity.ConfigEntity {
+        public static final DefaultClientIdEntity INSTANCE = new 
DefaultClientIdEntity();
+
+        private DefaultClientIdEntity() {}
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+
+        @Override
+        public String toString() {
+            return "default client-id";
+        }
+    }
+
+    public record KafkaQuotaEntity(ClientQuotaEntity.ConfigEntity userEntity,
+                                          ClientQuotaEntity.ConfigEntity 
clientIdEntity) implements ClientQuotaEntity {
+
+        @Override
+        public List<ConfigEntity> configEntities() {
+            List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>();
+            if (userEntity != null) {
+                entities.add(userEntity);
+            }
+            if (clientIdEntity != null) {
+                entities.add(clientIdEntity);
+            }
+            return entities;
+        }
+
+        public String sanitizedUser() {
+            if (userEntity instanceof UserEntity userRecord) {
+                return userRecord.sanitizedUser();
+            } else if (userEntity == DefaultUserEntity.INSTANCE) {
+                return DEFAULT_NAME;
+            }
+            return "";
+        }
+
+        public String clientId() {
+            return clientIdEntity != null ? clientIdEntity.name() : "";
+        }
+
+        @Override
+        public String toString() {
+            String user = userEntity != null ? userEntity.toString() : "";
+            String clientId = clientIdEntity != null ? 
clientIdEntity.toString() : "";
+            return (user + " " + clientId).trim();
+        }
+    }
+
+    public static class DefaultTags {
+        public static final String USER = "user";
+        public static final String CLIENT_ID = "client-id";
+    }
+
+    private final ClientQuotaManagerConfig config;
+    protected final Metrics metrics;
+    private final QuotaType quotaType;
+    protected final Time time;
+    private final Optional<Plugin<ClientQuotaCallback>> 
clientQuotaCallbackPlugin;
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final SensorAccess sensorAccessor;
+    private final ClientQuotaCallback quotaCallback;
+    private final ClientQuotaType clientQuotaType;
+
+    private volatile int quotaTypesEnabled;
+
+    private final Sensor delayQueueSensor;
+    private final DelayQueue<ThrottledChannel> delayQueue = new DelayQueue<>();
+    private final ThrottledChannelReaper throttledChannelReaper;
+
+    public void processThrottledChannelReaperDoWork() {
+        throttledChannelReaper.doWork();
+    }
+
+    /**
+     * Helper class that records per-client metrics. It is also responsible 
for maintaining Quota usage statistics
+     * for all clients.
+     * <p/>
+     * Quotas can be set at <user, client-id>, user or client-id levels. For a 
given client connection,
+     * the most specific quota matching the connection will be applied. For 
example, if both a <user, client-id>
+     * and a user quota match a connection, the <user, client-id> quota will 
be used. Otherwise, user quota takes
+     * precedence over client-id quota. The order of precedence is:
+     * <ul>
+     *   <li>/config/users/<user>/clients/<client-id>
+     *   <li>/config/users/<user>/clients/<default>
+     *   <li>/config/users/<user>
+     *   <li>/config/users/<default>/clients/<client-id>
+     *   <li>/config/users/<default>/clients/<default>
+     *   <li>/config/users/<default>
+     *   <li>/config/clients/<client-id>
+     *   <li>/config/clients/<default>
+     * </ul>
+     * Quota limits including defaults may be updated dynamically. The 
implementation is optimized for the case
+     * where a single level of quotas is configured.
+     * @param config the ClientQuotaManagerConfig containing quota 
configurations
+     * @param metrics the Metrics instance for recording quota-related metrics
+     * @param quotaType the quota type managed by this quota manager
+     * @param time the Time object used for time-based operations
+     * @param threadNamePrefix the thread name prefix used for internal threads
+     * @param clientQuotaCallbackPlugin optional Plugin containing a 
ClientQuotaCallback for custom quota logic
+     */
+    public ClientQuotaManager(ClientQuotaManagerConfig config,
+                              Metrics metrics,
+                              QuotaType quotaType,
+                              Time time,
+                              String threadNamePrefix,
+                              Optional<Plugin<ClientQuotaCallback>> 
clientQuotaCallbackPlugin) {
+        this.config = config;
+        this.metrics = metrics;
+        this.quotaType = quotaType;
+        this.time = time;
+        this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin;
+
+        this.sensorAccessor = new SensorAccess(lock, metrics);
+        this.clientQuotaType = QuotaType.toClientQuotaType(quotaType);
+
+        this.quotaTypesEnabled = clientQuotaCallbackPlugin.isPresent() ?
+                QuotaTypes.CUSTOM_QUOTAS : QuotaTypes.NO_QUOTAS;
+
+        this.delayQueueSensor = metrics.sensor(quotaType.toString() + 
"-delayQueue");
+        this.delayQueueSensor.add(metrics.metricName("queue-size", 
quotaType.toString(),
+                "Tracks the size of the delay queue"), new CumulativeSum());
+        this.throttledChannelReaper = new ThrottledChannelReaper(delayQueue, 
threadNamePrefix);
+
+        this.quotaCallback = clientQuotaCallbackPlugin
+                .map(Plugin::get)
+                .orElse(new DefaultQuotaCallback());
+
+        start(); // Use the start method to keep spotbugs happy
+    }
+
+    public ClientQuotaManager(ClientQuotaManagerConfig config,
+                              Metrics metrics,
+                              QuotaType quotaType,
+                              Time time,
+                              String threadNamePrefix) {
+        this(config, metrics, quotaType, time, threadNamePrefix, 
Optional.empty());
+    }
+
+    protected Metrics metrics() {
+        return metrics;
+    }
+    protected Time time() {
+        return time;
+    }
+
+    private void start() {
+        throttledChannelReaper.start();
+    }
+
+    /**
+     * Reaper thread that triggers channel unmute callbacks on all throttled 
channels
+     */
+    public class ThrottledChannelReaper extends ShutdownableThread {
+        private final DelayQueue<ThrottledChannel> delayQueue;
+
+        public ThrottledChannelReaper(DelayQueue<ThrottledChannel> delayQueue, 
String prefix) {
+            super(prefix + "ThrottledChannelReaper-" + quotaType, false);
+            this.delayQueue = delayQueue;
+        }
+
+        @Override
+        public void doWork() {
+            ThrottledChannel throttledChannel;
+            try {
+                throttledChannel = delayQueue.poll(1, TimeUnit.SECONDS);
+                if (throttledChannel != null) {
+                    // Decrement the size of the delay queue
+                    delayQueueSensor.record(-1);
+                    // Notify the socket server that throttling is done for 
this channel
+                    throttledChannel.notifyThrottlingDone();
+                }
+            } catch (InterruptedException e) {
+                // Ignore and continue
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Returns true if any quotas are enabled for this quota manager. This is 
used
+     * to determine if quota-related metrics should be created.
+     * Note: If any quotas (static defaults, dynamic defaults or quota 
overrides) have
+     * been configured for this broker at any time for this quota type, 
quotasEnabled will
+     * return true until the next broker restarts, even if all quotas are 
subsequently deleted.
+     */
+    public boolean quotasEnabled() {
+        return quotaTypesEnabled != QuotaTypes.NO_QUOTAS;
+    }
+
+    /**
+     * See recordAndGetThrottleTimeMs.
+     */
+    public int maybeRecordAndGetThrottleTimeMs(Session session, String 
clientId, double value, long timeMs) {
+        // Record metrics only if quotas are enabled.
+        if (quotasEnabled()) {
+            return recordAndGetThrottleTimeMs(session, clientId, value, 
timeMs);
+        } else {
+            return 0;
+        }
+    }
+
+    /**
+     * Records that a user/clientId accumulated or would like to accumulate 
the provided amount at the
+     * specified time, returns throttle time in milliseconds.
+     *
+     * @param session The session from which the user is extracted
+     * @param clientId The client id
+     * @param value The value to accumulate
+     * @param timeMs The time at which to accumulate the value
+     * @return The throttle time in milliseconds defines as the time to wait 
until the average
+     *         rate gets back to the defined quota
+     */
+    public int recordAndGetThrottleTimeMs(Session session, String clientId, 
double value, long timeMs) {
+        ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+        try {
+            clientSensors.quotaSensor().record(value, timeMs, true);
+            return 0;
+        } catch (QuotaViolationException e) {
+            int throttleTimeMs = (int) throttleTime(e, timeMs);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Quota violated for sensor ({}). Delay time: ({})",
+                        clientSensors.quotaSensor().name(), throttleTimeMs);
+            }
+            return throttleTimeMs;
+        }
+    }
+
+    /**
+     * Records that a user/clientId changed some metric being throttled 
without checking for
+     * quota violation. The aggregate value will subsequently be used for 
throttling when the
+     * next request is processed.
+     */
+    public void recordNoThrottle(Session session, String clientId, double 
value) {
+        ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+        clientSensors.quotaSensor().record(value, time.milliseconds(), false);
+    }
+
+    /**
+     * "Unrecord" the given value that has already been recorded for the given 
user/client by recording a negative value
+     * of the same quantity.
+     * For a throttled fetch, the broker should return an empty response and 
thus should not record the value. Ideally,
+     * we would like to compute the throttle time before actually recording 
the value, but the current Sensor code
+     * couples value recording and quota checking very tightly. As a 
workaround, we will unrecord the value for the fetch
+     * in case of throttling. Rate keeps the sum of values that fall in each 
time window, so this should bring the
+     * overall sum back to the previous value.
+     */
+    public void unrecordQuotaSensor(Session session, String clientId, double 
value, long timeMs) {
+        ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+        clientSensors.quotaSensor().record(value * -1, timeMs, false);
+    }
+
+    /**
+     * Returns the maximum value that could be recorded without guaranteed 
throttling.
+     * Recording any larger value will always be throttled, even if no other 
values were recorded in the quota window.
+     * This is used for deciding the maximum bytes that can be fetched at once
+     */
+    public double getMaxValueInQuotaWindow(Session session, String clientId) {
+        if (quotasEnabled()) {
+            ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);
+            Double limit = quotaCallback.quotaLimit(clientQuotaType, 
clientSensors.metricTags());
+            if (limit != null) {
+                return limit * (config.numQuotaSamples - 1) * 
config.quotaWindowSizeSeconds;
+            } else {
+                return Double.MAX_VALUE;
+            }
+        } else {
+            return Double.MAX_VALUE;
+        }
+    }
+
+    /**
+     * Throttle a client by muting the associated channel for the given 
throttle time.
+     * @param clientId request client id
+     * @param session request session
+     * @param throttleTimeMs Duration in milliseconds for which the channel is 
to be muted.
+     * @param throttleCallback Callback for channel throttling
+     */
+    public void throttle(
+            String clientId,
+            Session session,
+            ThrottleCallback throttleCallback,
+            int throttleTimeMs
+    ) {
+        if (throttleTimeMs > 0) {
+            ClientSensors clientSensors = getOrCreateQuotaSensors(session, 
clientId);

Review Comment:
   it would be cool if we can leverage `var` to cleanup the type declaration.
   ```java
               var clientSensors = getOrCreateQuotaSensors(session, clientId);
               clientSensors.throttleTimeSensor().record(throttleTimeMs);
               var throttledChannel = new ThrottledChannel(time, 
throttleTimeMs, throttleCallback);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to