mimaison commented on code in PR #19068: URL: https://github.com/apache/kafka/pull/19068#discussion_r2018502560
########## clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java: ########## @@ -24,6 +24,10 @@ /** * Quota callback interface for brokers and controllers that enables customization of client quota computation. + * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the callback to register metrics. + * The following tags are automatically added to all metrics registered: + * <code>config</code> set to <code>clientQuotaCallback.class</code>, and <code>class</code> set to the Review Comment: We need to mention the "role" tag as well. ########## core/src/main/java/kafka/server/QuotaFactory.java: ########## @@ -113,19 +115,35 @@ public void shutdown() { } } - public static QuotaManagers instantiate(KafkaConfig cfg, Metrics metrics, Time time, String threadNamePrefix) { - ClientQuotaCallback clientQuotaCallback = cfg.getConfiguredInstance( - QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, ClientQuotaCallback.class); + public static QuotaManagers instantiate( + KafkaConfig cfg, + Metrics metrics, + Time time, + String threadNamePrefix, + String role + ) { + Plugin<ClientQuotaCallback> clientQuotaCallbackPlugin = createClientQuotaCallback(cfg, metrics, role); return new QuotaManagers( - new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.FETCH, time, threadNamePrefix, Option.apply(clientQuotaCallback)), - new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.PRODUCE, time, threadNamePrefix, Option.apply(clientQuotaCallback)), - new ClientRequestQuotaManager(clientConfig(cfg), metrics, time, threadNamePrefix, Optional.ofNullable(clientQuotaCallback)), - new ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics, time, threadNamePrefix, Option.apply(clientQuotaCallback)), + new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.FETCH, time, threadNamePrefix, Option.apply(clientQuotaCallbackPlugin)), + new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.PRODUCE, time, threadNamePrefix, Option.apply(clientQuotaCallbackPlugin)), + new ClientRequestQuotaManager(clientConfig(cfg), metrics, time, threadNamePrefix, Optional.ofNullable(clientQuotaCallbackPlugin)), + new ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics, time, threadNamePrefix, Option.apply(clientQuotaCallbackPlugin)), new ReplicationQuotaManager(replicationConfig(cfg), metrics, QuotaType.LEADER_REPLICATION, time), new ReplicationQuotaManager(replicationConfig(cfg), metrics, QuotaType.FOLLOWER_REPLICATION, time), new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, QuotaType.ALTER_LOG_DIRS_REPLICATION, time), - Optional.ofNullable(clientQuotaCallback) + clientQuotaCallbackPlugin == null ? Optional.empty() : Optional.ofNullable(clientQuotaCallbackPlugin.get()) + ); + } + + private static Plugin<ClientQuotaCallback> createClientQuotaCallback(KafkaConfig cfg, Metrics metrics, String role) { Review Comment: We should directly return `Optional` instead of returning `null` to then wrapping it in `Option`/`Optional` right after. ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java: ########## @@ -125,4 +150,26 @@ public void configure(Map<String, ?> configs) { } } + + public static class MonitorCustomQuotaCallback extends CustomQuotaCallback implements Monitorable { Review Comment: I think `MonitorableCustomQuotaCallback` would be a better name. ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java: ########## @@ -125,4 +150,26 @@ public void configure(Map<String, ?> configs) { } } + + public static class MonitorCustomQuotaCallback extends CustomQuotaCallback implements Monitorable { + + public final AtomicInteger counter = new AtomicInteger(); Review Comment: This is effectively unused as it's never checked. ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java: ########## @@ -19,64 +19,89 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Monitorable; +import org.apache.kafka.common.metrics.PluginMetrics; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.TestUtils; import org.apache.kafka.common.test.api.ClusterConfigProperty; import org.apache.kafka.common.test.api.ClusterTest; -import org.apache.kafka.common.test.api.ClusterTestDefaults; import org.apache.kafka.common.test.api.Type; -import org.apache.kafka.common.test.junit.ClusterTestExtensions; +import org.apache.kafka.server.ProcessRole; import org.apache.kafka.server.config.QuotaConfig; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.Test; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -@ClusterTestDefaults(controllers = 3, - types = {Type.KRAFT}, - serverProperties = { - @ClusterConfigProperty(id = 3000, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"), - @ClusterConfigProperty(id = 3001, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"), - @ClusterConfigProperty(id = 3002, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"), - } -) -@ExtendWith(ClusterTestExtensions.class) -public class CustomQuotaCallbackTest { +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; - private final ClusterInstance cluster; +public class CustomQuotaCallbackTest { - public CustomQuotaCallbackTest(ClusterInstance clusterInstance) { - this.cluster = clusterInstance; - } + @ClusterTest( + controllers = 3, + types = {Type.KRAFT}, + serverProperties = { + @ClusterConfigProperty(id = 3000, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"), + @ClusterConfigProperty(id = 3001, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"), + @ClusterConfigProperty(id = 3002, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"), + } + ) + public void testCustomQuotaCallbackWithControllerServer(ClusterInstance cluster) throws InterruptedException { - @ClusterTest - public void testCustomQuotaCallbackWithControllerServer() throws InterruptedException { - try (Admin admin = cluster.admin(Map.of())) { admin.createTopics(List.of(new NewTopic("topic", 1, (short) 1))); TestUtils.waitForCondition( - () -> CustomQuotaCallback.COUNTERS.size() == 3 - && CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0), + () -> CustomQuotaCallback.COUNTERS.size() == 3 + && CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0), "The CustomQuotaCallback not triggered in all controllers. " ); - + // Reset the counters, and we expect the callback to be triggered again in all controllers CustomQuotaCallback.COUNTERS.clear(); - + admin.deleteTopics(List.of("topic")); TestUtils.waitForCondition( () -> CustomQuotaCallback.COUNTERS.size() == 3 - && CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0), + && CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0), "The CustomQuotaCallback not triggered in all controllers. " ); - + } } + @Test + public void testCreateMonitorCustomQuotaCallback() { Review Comment: This is not an integration test. This also only exercises `Plugin.wrapInstance()`. I think we want an integration test like the test above. ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -137,23 +138,33 @@ object ClientQuotaManager { * @param quotaType Quota type of this quota manager * @param time @Time object to use * @param threadNamePrefix The thread prefix to use - * @param clientQuotaCallback An optional @ClientQuotaCallback + * @param clientQuotaCallbackPlugin An optional @ClientQuotaCallback and + * warp it in a {@link org.apache.kafka.common.internals.Plugin} */ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, private val metrics: Metrics, private val quotaType: QuotaType, private val time: Time, private val threadNamePrefix: String, - private val clientQuotaCallback: Option[ClientQuotaCallback] = None) extends Logging { + private val clientQuotaCallbackPlugin: Option[Plugin[ClientQuotaCallback]] = None) extends Logging { private val lock = new ReentrantReadWriteLock() private val sensorAccessor = new SensorAccess(lock, metrics) - private val quotaCallback = clientQuotaCallback.getOrElse(new DefaultQuotaCallback) + private val quotaCallback = clientQuotaCallbackPlugin match { + case Some(plugin) => if (plugin.get() == null) Review Comment: Can `plugin.get()` be `null`? ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java: ########## @@ -19,64 +19,89 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Monitorable; +import org.apache.kafka.common.metrics.PluginMetrics; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.TestUtils; import org.apache.kafka.common.test.api.ClusterConfigProperty; import org.apache.kafka.common.test.api.ClusterTest; -import org.apache.kafka.common.test.api.ClusterTestDefaults; import org.apache.kafka.common.test.api.Type; -import org.apache.kafka.common.test.junit.ClusterTestExtensions; +import org.apache.kafka.server.ProcessRole; import org.apache.kafka.server.config.QuotaConfig; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.Test; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -@ClusterTestDefaults(controllers = 3, - types = {Type.KRAFT}, - serverProperties = { - @ClusterConfigProperty(id = 3000, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"), - @ClusterConfigProperty(id = 3001, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"), - @ClusterConfigProperty(id = 3002, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"), - } -) -@ExtendWith(ClusterTestExtensions.class) -public class CustomQuotaCallbackTest { +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; - private final ClusterInstance cluster; +public class CustomQuotaCallbackTest { - public CustomQuotaCallbackTest(ClusterInstance clusterInstance) { - this.cluster = clusterInstance; - } + @ClusterTest( + controllers = 3, + types = {Type.KRAFT}, + serverProperties = { + @ClusterConfigProperty(id = 3000, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"), + @ClusterConfigProperty(id = 3001, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"), + @ClusterConfigProperty(id = 3002, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"), + } + ) + public void testCustomQuotaCallbackWithControllerServer(ClusterInstance cluster) throws InterruptedException { - @ClusterTest - public void testCustomQuotaCallbackWithControllerServer() throws InterruptedException { - try (Admin admin = cluster.admin(Map.of())) { admin.createTopics(List.of(new NewTopic("topic", 1, (short) 1))); TestUtils.waitForCondition( - () -> CustomQuotaCallback.COUNTERS.size() == 3 - && CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0), + () -> CustomQuotaCallback.COUNTERS.size() == 3 + && CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0), "The CustomQuotaCallback not triggered in all controllers. " ); - + // Reset the counters, and we expect the callback to be triggered again in all controllers CustomQuotaCallback.COUNTERS.clear(); - + admin.deleteTopics(List.of("topic")); TestUtils.waitForCondition( () -> CustomQuotaCallback.COUNTERS.size() == 3 - && CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0), + && CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0), "The CustomQuotaCallback not triggered in all controllers. " ); - + } } + @Test + public void testCreateMonitorCustomQuotaCallback() { + Metrics metrics = new Metrics(); + assertEquals(1, metrics.metrics().size()); + Plugin<ClientQuotaCallback> clientQuotaCallbackPlugin = Plugin.wrapInstance( + new MonitorCustomQuotaCallback(), + metrics, + QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, + Map.of("role", ProcessRole.BrokerRole.toString()) + ); + MonitorCustomQuotaCallback clientQuotaCallback = (MonitorCustomQuotaCallback) clientQuotaCallbackPlugin.get(); + assertEquals(MonitorCustomQuotaCallback.class, clientQuotaCallback.getClass()); Review Comment: This is not necessary, `clientQuotaCallback` is already of type `MonitorCustomQuotaCallback` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org