Yunyung commented on code in PR #19397:
URL: https://github.com/apache/kafka/pull/19397#discussion_r2461325589


##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -620,21 +621,22 @@ private Map<String, ConfigProvider> 
instantiateConfigProviders(
             }
         }
         // Instantiate Config Providers
-        Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
+        Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = 
new HashMap<>();
         for (Map.Entry<String, String> entry : providerMap.entrySet()) {
             try {
                 String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() 
+ CONFIG_PROVIDERS_PARAM;
                 Map<String, ?> configProperties = 
configProviderProperties(prefix, providerConfigProperties);
                 ConfigProvider provider = Utils.newInstance(entry.getValue(), 
ConfigProvider.class);
                 provider.configure(configProperties);
-                configProviderInstances.put(entry.getKey(), provider);
+                Plugin<ConfigProvider> providerPlugin = 
Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);

Review Comment:
   > If we always pass a null Metrics instance, we're never going to call 
withPluginMetrics(), so ConfigProvider instances can't register any metrics.
   
   Yes, that's expected, since AbstractConfig doesn't have a Metrics instance.
   
   If so, why do we wrap the provider into a Plugin here? 
   The reason is that the provider will be a member of `ConfigTransformer` and 
will be used by `ConfigTransformer#transform`.
   
https://github.com/apache/kafka/pull/19397/files#diff-e1b7dda05dfd0104e51c40c3f3dada978a51e0d734c535a34ca2a0e213edb214R557
   ```
   ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
   ```
   So even though we expect it not to register any metrics, we still wrap the 
`provider` into a `Plugin` here.
   
   This is what I was trying to explain above.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java:
##########
@@ -790,4 +806,13 @@ public void configure(Map<String, ?> configs) {
             super.configure(configs);
         }
     }
+
+    public static class CustomMonitorableConfigProvider extends 
MonitorableConfigProvider {
+
+        @Override
+        public void withPluginMetrics(PluginMetrics metrics) {
+            assertTrue(configured);

Review Comment:
   Great point! Fixed it.



##########
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java:
##########


Review Comment:
   I tested it in WorkerTest.java, do you mean that?



##########
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java:
##########
@@ -94,13 +95,13 @@ public ConfigTransformerResult transform(Map<String, 
String> configs) {
         Map<String, Long> ttls = new HashMap<>();
         for (Map.Entry<String, Map<String, Set<String>>> entry : 
keysByProvider.entrySet()) {
             String providerName = entry.getKey();
-            ConfigProvider provider = configProviders.get(providerName);
+            Plugin<ConfigProvider> providerPlugin = 
configProviderPlugins.get(providerName);
             Map<String, Set<String>> keysByPath = entry.getValue();
-            if (provider != null && keysByPath != null) {
+            if (providerPlugin != null && keysByPath != null) {

Review Comment:
   Done.



##########
clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java:
##########
@@ -29,6 +29,11 @@
  * <p>Kafka Connect discovers implementations of this interface using the Java 
{@link java.util.ServiceLoader} mechanism.
  * To support this, implementations of this interface should also contain a 
service provider configuration file in
  * {@code 
META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider}.
+ * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable 
the config provider to register metrics.
+ * The following tags are automatically added to all metrics registered: 
<code>config</code> set to
+ * <code>config.providers</code>, <code>class</code> set to the ConfigProvider 
class name,
+ * and <code>provider</code> set to the provider name. Note that MirrorMaker 
will not reflect this behavior until
+ * KAFKA-19149 is implemented.

Review Comment:
   My understanding:
   MirrorMaker doesn’t have its own Metrics, while MirrorSourceConnector and 
MirrorCheckpointConnector do.  
[KAFKA-19149](https://issues.apache.org/jira/browse/KAFKA-19149) aims to update 
this.
   The `ConfigProviderPlugin` needs Metrics from MirrorMaker. Once 
[KAFKA-19149](https://issues.apache.org/jira/browse/KAFKA-19149) is 
implemented, these Metrics can be passed properly.
   
   Feel free to correct me if I misunderstood anything.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java:
##########
@@ -383,16 +388,27 @@ public void 
newConfigProviderShouldConfigureWithPluginClassLoader() {
             createConfig();
         }
 
-        ConfigProvider plugin = plugins.newConfigProvider(
+        Plugin<ConfigProvider> plugin = plugins.newConfigProvider(
             config,
-            providerPrefix,
-            ClassLoaderUsage.PLUGINS
+            providerName,
+            ClassLoaderUsage.PLUGINS,
+            null
         );
 
-        assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect 
samples");
-        Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) 
plugin).flatten();
+        assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot 
collect samples");
+        Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) 
plugin.get()).flatten();
         assertTrue(samples.containsKey("configure"));
-        assertPluginClassLoaderAlwaysActive(plugin);
+        assertPluginClassLoaderAlwaysActive(plugin.get());
+    }
+
+    @Test
+    public void newConfigProviderShouldCallWithPluginMetricsAfterConfigure() {
+        String providerName = "monitorable";
+        String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + 
providerName;
+        props.put(providerPrefix + ".class", 
CustomMonitorableConfigProvider.class.getName());
+        createConfig();
+        Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, 
providerName, ClassLoaderUsage.PLUGINS, new Metrics());
+        assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());

Review Comment:
   Done.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java:
##########
@@ -790,4 +806,13 @@ public void configure(Map<String, ?> configs) {
             super.configure(configs);
         }
     }
+
+    public static class CustomMonitorableConfigProvider extends 
MonitorableConfigProvider {

Review Comment:
   Sure. Let's just use `MonitorableConfigProvider` instead.



##########
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java:
##########
@@ -45,12 +46,12 @@ public class ConfigTransformerTest {
 
     @BeforeEach
     public void setup() {
-        configTransformer = new 
ConfigTransformer(Collections.singletonMap("test", new TestConfigProvider()));
+        configTransformer = new ConfigTransformer(Map.of("test", 
Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers")));
     }
 
     @Test
     public void testReplaceVariable() {
-        ConfigTransformerResult result = 
configTransformer.transform(Collections.singletonMap(MY_KEY, 
"${test:testPath:testKey}"));
+        ConfigTransformerResult result = 
configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKey}"));

Review Comment:
   Undo the change, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to