mimaison commented on code in PR #19068:
URL: https://github.com/apache/kafka/pull/19068#discussion_r2018502560


##########
clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java:
##########
@@ -24,6 +24,10 @@
 
 /**
  * Quota callback interface for brokers and controllers that enables 
customization of client quota computation.
+ * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the 
callback to register metrics. 
+ * The following tags are automatically added to all metrics registered: 
+ * <code>config</code> set to <code>clientQuotaCallback.class</code>, and 
<code>class</code> set to the 

Review Comment:
   We need to mention the "role" tag as well.



##########
core/src/main/java/kafka/server/QuotaFactory.java:
##########
@@ -113,19 +115,35 @@ public void shutdown() {
         }
     }
 
-    public static QuotaManagers instantiate(KafkaConfig cfg, Metrics metrics, 
Time time, String threadNamePrefix) {
-        ClientQuotaCallback clientQuotaCallback = cfg.getConfiguredInstance(
-            QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, 
ClientQuotaCallback.class);
+    public static QuotaManagers instantiate(
+        KafkaConfig cfg,
+        Metrics metrics,
+        Time time,
+        String threadNamePrefix,
+        String role
+    ) {
+        Plugin<ClientQuotaCallback> clientQuotaCallbackPlugin = 
createClientQuotaCallback(cfg, metrics, role);
 
         return new QuotaManagers(
-            new ClientQuotaManager(clientConfig(cfg), metrics, 
QuotaType.FETCH, time, threadNamePrefix, Option.apply(clientQuotaCallback)),
-            new ClientQuotaManager(clientConfig(cfg), metrics, 
QuotaType.PRODUCE, time, threadNamePrefix, Option.apply(clientQuotaCallback)),
-            new ClientRequestQuotaManager(clientConfig(cfg), metrics, time, 
threadNamePrefix, Optional.ofNullable(clientQuotaCallback)),
-            new 
ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics, 
time, threadNamePrefix, Option.apply(clientQuotaCallback)),
+            new ClientQuotaManager(clientConfig(cfg), metrics, 
QuotaType.FETCH, time, threadNamePrefix, 
Option.apply(clientQuotaCallbackPlugin)),
+            new ClientQuotaManager(clientConfig(cfg), metrics, 
QuotaType.PRODUCE, time, threadNamePrefix, 
Option.apply(clientQuotaCallbackPlugin)),
+            new ClientRequestQuotaManager(clientConfig(cfg), metrics, time, 
threadNamePrefix, Optional.ofNullable(clientQuotaCallbackPlugin)),
+            new 
ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics, 
time, threadNamePrefix, Option.apply(clientQuotaCallbackPlugin)),
             new ReplicationQuotaManager(replicationConfig(cfg), metrics, 
QuotaType.LEADER_REPLICATION, time),
             new ReplicationQuotaManager(replicationConfig(cfg), metrics, 
QuotaType.FOLLOWER_REPLICATION, time),
             new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), 
metrics, QuotaType.ALTER_LOG_DIRS_REPLICATION, time),
-            Optional.ofNullable(clientQuotaCallback)
+            clientQuotaCallbackPlugin == null ? Optional.empty() : 
Optional.ofNullable(clientQuotaCallbackPlugin.get())
+        );
+    }
+
+    private static Plugin<ClientQuotaCallback> 
createClientQuotaCallback(KafkaConfig cfg, Metrics metrics, String role) {

Review Comment:
   We should directly return `Optional` instead of returning `null` to then 
wrapping it in `Option`/`Optional` right after.



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java:
##########
@@ -125,4 +150,26 @@ public void configure(Map<String, ?> configs) {
         }
 
     }
+
+    public static class MonitorCustomQuotaCallback extends CustomQuotaCallback 
implements Monitorable {

Review Comment:
   I think `MonitorableCustomQuotaCallback` would be a better name.



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java:
##########
@@ -125,4 +150,26 @@ public void configure(Map<String, ?> configs) {
         }
 
     }
+
+    public static class MonitorCustomQuotaCallback extends CustomQuotaCallback 
implements Monitorable {
+
+        public final AtomicInteger counter = new AtomicInteger();

Review Comment:
   This is effectively unused as it's never checked.



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java:
##########
@@ -19,64 +19,89 @@
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.PluginMetrics;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.TestUtils;
 import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
-import org.apache.kafka.common.test.api.ClusterTestDefaults;
 import org.apache.kafka.common.test.api.Type;
-import org.apache.kafka.common.test.junit.ClusterTestExtensions;
+import org.apache.kafka.server.ProcessRole;
 import org.apache.kafka.server.config.QuotaConfig;
 
-import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.Test;
 
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-@ClusterTestDefaults(controllers = 3, 
-    types = {Type.KRAFT},
-    serverProperties = {
-        @ClusterConfigProperty(id = 3000, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
-        @ClusterConfigProperty(id = 3001, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
-        @ClusterConfigProperty(id = 3002, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
-    }
-)
-@ExtendWith(ClusterTestExtensions.class)
-public class CustomQuotaCallbackTest {
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
-    private final ClusterInstance cluster;
+public class CustomQuotaCallbackTest {
 
-    public CustomQuotaCallbackTest(ClusterInstance clusterInstance) {
-        this.cluster = clusterInstance;
-    }
+    @ClusterTest(
+        controllers = 3,
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(id = 3000, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
+            @ClusterConfigProperty(id = 3001, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
+            @ClusterConfigProperty(id = 3002, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
+        }
+    )
+    public void testCustomQuotaCallbackWithControllerServer(ClusterInstance 
cluster) throws InterruptedException {
 
-    @ClusterTest
-    public void testCustomQuotaCallbackWithControllerServer() throws 
InterruptedException {
-        
         try (Admin admin = cluster.admin(Map.of())) {
             admin.createTopics(List.of(new NewTopic("topic", 1, (short) 1)));
             TestUtils.waitForCondition(
-                () -> CustomQuotaCallback.COUNTERS.size() == 3 
-                        && 
CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> 
counter.get() > 0), 
+                () -> CustomQuotaCallback.COUNTERS.size() == 3
+                        && 
CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> 
counter.get() > 0),
                     "The CustomQuotaCallback not triggered in all controllers. 
"
             );
-            
+
             // Reset the counters, and we expect the callback to be triggered 
again in all controllers
             CustomQuotaCallback.COUNTERS.clear();
-            
+
             admin.deleteTopics(List.of("topic"));
             TestUtils.waitForCondition(
                 () -> CustomQuotaCallback.COUNTERS.size() == 3
-                        && 
CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> 
counter.get() > 0), 
+                        && 
CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> 
counter.get() > 0),
                     "The CustomQuotaCallback not triggered in all controllers. 
"
             );
-            
+
         }
     }
 
+    @Test
+    public void testCreateMonitorCustomQuotaCallback() {

Review Comment:
   This is not an integration test. This also only exercises 
`Plugin.wrapInstance()`. I think we want an integration test like the test 
above.



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -137,23 +138,33 @@ object ClientQuotaManager {
  * @param quotaType Quota type of this quota manager
  * @param time @Time object to use
  * @param threadNamePrefix The thread prefix to use
- * @param clientQuotaCallback An optional @ClientQuotaCallback
+ * @param clientQuotaCallbackPlugin An optional @ClientQuotaCallback and 
+ *                                  warp it in a {@link 
org.apache.kafka.common.internals.Plugin}
  */
 class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
                          private val metrics: Metrics,
                          private val quotaType: QuotaType,
                          private val time: Time,
                          private val threadNamePrefix: String,
-                         private val clientQuotaCallback: 
Option[ClientQuotaCallback] = None) extends Logging {
+                         private val clientQuotaCallbackPlugin: 
Option[Plugin[ClientQuotaCallback]] = None) extends Logging {
 
   private val lock = new ReentrantReadWriteLock()
   private val sensorAccessor = new SensorAccess(lock, metrics)
-  private val quotaCallback = clientQuotaCallback.getOrElse(new 
DefaultQuotaCallback)
+  private val quotaCallback = clientQuotaCallbackPlugin match {
+    case Some(plugin) => if (plugin.get() == null)

Review Comment:
   Can `plugin.get()` be `null`?



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java:
##########
@@ -19,64 +19,89 @@
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.PluginMetrics;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.TestUtils;
 import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
-import org.apache.kafka.common.test.api.ClusterTestDefaults;
 import org.apache.kafka.common.test.api.Type;
-import org.apache.kafka.common.test.junit.ClusterTestExtensions;
+import org.apache.kafka.server.ProcessRole;
 import org.apache.kafka.server.config.QuotaConfig;
 
-import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.Test;
 
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-@ClusterTestDefaults(controllers = 3, 
-    types = {Type.KRAFT},
-    serverProperties = {
-        @ClusterConfigProperty(id = 3000, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
-        @ClusterConfigProperty(id = 3001, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
-        @ClusterConfigProperty(id = 3002, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
-    }
-)
-@ExtendWith(ClusterTestExtensions.class)
-public class CustomQuotaCallbackTest {
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
-    private final ClusterInstance cluster;
+public class CustomQuotaCallbackTest {
 
-    public CustomQuotaCallbackTest(ClusterInstance clusterInstance) {
-        this.cluster = clusterInstance;
-    }
+    @ClusterTest(
+        controllers = 3,
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(id = 3000, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
+            @ClusterConfigProperty(id = 3001, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
+            @ClusterConfigProperty(id = 3002, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
+        }
+    )
+    public void testCustomQuotaCallbackWithControllerServer(ClusterInstance 
cluster) throws InterruptedException {
 
-    @ClusterTest
-    public void testCustomQuotaCallbackWithControllerServer() throws 
InterruptedException {
-        
         try (Admin admin = cluster.admin(Map.of())) {
             admin.createTopics(List.of(new NewTopic("topic", 1, (short) 1)));
             TestUtils.waitForCondition(
-                () -> CustomQuotaCallback.COUNTERS.size() == 3 
-                        && 
CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> 
counter.get() > 0), 
+                () -> CustomQuotaCallback.COUNTERS.size() == 3
+                        && 
CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> 
counter.get() > 0),
                     "The CustomQuotaCallback not triggered in all controllers. 
"
             );
-            
+
             // Reset the counters, and we expect the callback to be triggered 
again in all controllers
             CustomQuotaCallback.COUNTERS.clear();
-            
+
             admin.deleteTopics(List.of("topic"));
             TestUtils.waitForCondition(
                 () -> CustomQuotaCallback.COUNTERS.size() == 3
-                        && 
CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> 
counter.get() > 0), 
+                        && 
CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> 
counter.get() > 0),
                     "The CustomQuotaCallback not triggered in all controllers. 
"
             );
-            
+
         }
     }
 
+    @Test
+    public void testCreateMonitorCustomQuotaCallback() {
+        Metrics metrics = new Metrics();
+        assertEquals(1, metrics.metrics().size());
+        Plugin<ClientQuotaCallback> clientQuotaCallbackPlugin = 
Plugin.wrapInstance(
+            new MonitorCustomQuotaCallback(),
+            metrics,
+            QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG,
+            Map.of("role", ProcessRole.BrokerRole.toString())
+        );
+        MonitorCustomQuotaCallback clientQuotaCallback = 
(MonitorCustomQuotaCallback) clientQuotaCallbackPlugin.get();
+        assertEquals(MonitorCustomQuotaCallback.class, 
clientQuotaCallback.getClass());

Review Comment:
   This is not necessary, `clientQuotaCallback` is already of type 
`MonitorCustomQuotaCallback`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to