mimaison commented on code in PR #19397:
URL: https://github.com/apache/kafka/pull/19397#discussion_r2376031213


##########
clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java:
##########
@@ -29,6 +29,11 @@
  * <p>Kafka Connect discovers implementations of this interface using the Java 
{@link java.util.ServiceLoader} mechanism.
  * To support this, implementations of this interface should also contain a 
service provider configuration file in
  * {@code 
META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider}.
+ * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable 
the config provider to register metrics.
+ * The following tags are automatically added to all metrics registered: 
<code>config</code> set to
+ * <code>config.providers</code>, <code>class</code> set to the ConfigProvider 
class name,
+ * and <code>provider</code> set to the provider name. Note that MirrorMaker 
will not reflect this behavior until
+ * KAFKA-19149 is implemented.

Review Comment:
   Not sure I understand this comment. The goal of KAFKA-19149 is to make 
MirrorSourceConnector and MirrorCheckpointConnector use the new API for their 
metrics (instead of instantiating  their own Metrics), it's not related to 
ConfigProviders.



##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -620,21 +621,22 @@ private Map<String, ConfigProvider> 
instantiateConfigProviders(
             }
         }
         // Instantiate Config Providers
-        Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
+        Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = 
new HashMap<>();
         for (Map.Entry<String, String> entry : providerMap.entrySet()) {
             try {
                 String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() 
+ CONFIG_PROVIDERS_PARAM;
                 Map<String, ?> configProperties = 
configProviderProperties(prefix, providerConfigProperties);
                 ConfigProvider provider = Utils.newInstance(entry.getValue(), 
ConfigProvider.class);
                 provider.configure(configProperties);
-                configProviderInstances.put(entry.getKey(), provider);
+                Plugin<ConfigProvider> providerPlugin = 
Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);

Review Comment:
   I'm not sure I understand your explanation. If we always pass a null Metrics 
instance, we're never going to call withPluginMetrics(), so ConfigProvider 
instances can't register any metrics.



##########
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java:
##########
@@ -94,13 +95,13 @@ public ConfigTransformerResult transform(Map<String, 
String> configs) {
         Map<String, Long> ttls = new HashMap<>();
         for (Map.Entry<String, Map<String, Set<String>>> entry : 
keysByProvider.entrySet()) {
             String providerName = entry.getKey();
-            ConfigProvider provider = configProviders.get(providerName);
+            Plugin<ConfigProvider> providerPlugin = 
configProviderPlugins.get(providerName);
             Map<String, Set<String>> keysByPath = entry.getValue();
-            if (provider != null && keysByPath != null) {
+            if (providerPlugin != null && keysByPath != null) {

Review Comment:
   We probably want to check `providerPlugin.get()` is not null as well. 



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java:
##########
@@ -790,4 +806,13 @@ public void configure(Map<String, ?> configs) {
             super.configure(configs);
         }
     }
+
+    public static class CustomMonitorableConfigProvider extends 
MonitorableConfigProvider {

Review Comment:
   Why do we need this class? can't we directly use `MonitorableConfigProvider`?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java:
##########
@@ -790,4 +806,13 @@ public void configure(Map<String, ?> configs) {
             super.configure(configs);
         }
     }
+
+    public static class CustomMonitorableConfigProvider extends 
MonitorableConfigProvider {
+
+        @Override
+        public void withPluginMetrics(PluginMetrics metrics) {
+            assertTrue(configured);

Review Comment:
   This assertion is not enough! The test would pass if this code is never 
executed



##########
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java:
##########
@@ -45,12 +46,12 @@ public class ConfigTransformerTest {
 
     @BeforeEach
     public void setup() {
-        configTransformer = new 
ConfigTransformer(Collections.singletonMap("test", new TestConfigProvider()));
+        configTransformer = new ConfigTransformer(Map.of("test", 
Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers")));
     }
 
     @Test
     public void testReplaceVariable() {
-        ConfigTransformerResult result = 
configTransformer.transform(Collections.singletonMap(MY_KEY, 
"${test:testPath:testKey}"));
+        ConfigTransformerResult result = 
configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKey}"));

Review Comment:
   Not sure all these changes make sense in this PR. It's a bit noisy and it's 
not even enough to get rid of the `Collections` import.



##########
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java:
##########


Review Comment:
   I don't see any tests in clients that use this class. We should have clients 
tests using this ConfigProvider and check its metric is registered correctly.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java:
##########
@@ -383,16 +388,27 @@ public void 
newConfigProviderShouldConfigureWithPluginClassLoader() {
             createConfig();
         }
 
-        ConfigProvider plugin = plugins.newConfigProvider(
+        Plugin<ConfigProvider> plugin = plugins.newConfigProvider(
             config,
-            providerPrefix,
-            ClassLoaderUsage.PLUGINS
+            providerName,
+            ClassLoaderUsage.PLUGINS,
+            null
         );
 
-        assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect 
samples");
-        Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) 
plugin).flatten();
+        assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot 
collect samples");
+        Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) 
plugin.get()).flatten();
         assertTrue(samples.containsKey("configure"));
-        assertPluginClassLoaderAlwaysActive(plugin);
+        assertPluginClassLoaderAlwaysActive(plugin.get());
+    }
+
+    @Test
+    public void newConfigProviderShouldCallWithPluginMetricsAfterConfigure() {
+        String providerName = "monitorable";
+        String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + 
providerName;
+        props.put(providerPrefix + ".class", 
CustomMonitorableConfigProvider.class.getName());
+        createConfig();
+        Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, 
providerName, ClassLoaderUsage.PLUGINS, new Metrics());
+        assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());

Review Comment:
   We need to check the metrics from the provider have been created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to