mimaison commented on code in PR #19397:
URL: https://github.com/apache/kafka/pull/19397#discussion_r2376031213
##########
clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java:
##########
@@ -29,6 +29,11 @@
* <p>Kafka Connect discovers implementations of this interface using the Java
{@link java.util.ServiceLoader} mechanism.
* To support this, implementations of this interface should also contain a
service provider configuration file in
* {@code
META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider}.
+ * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable
the config provider to register metrics.
+ * The following tags are automatically added to all metrics registered:
<code>config</code> set to
+ * <code>config.providers</code>, <code>class</code> set to the ConfigProvider
class name,
+ * and <code>provider</code> set to the provider name. Note that MirrorMaker
will not reflect this behavior until
+ * KAFKA-19149 is implemented.
Review Comment:
Not sure I understand this comment. The goal of KAFKA-19149 is to make
MirrorSourceConnector and MirrorCheckpointConnector use the new API for their
metrics (instead of instantiating their own Metrics), it's not related to
ConfigProviders.
##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -620,21 +621,22 @@ private Map<String, ConfigProvider>
instantiateConfigProviders(
}
}
// Instantiate Config Providers
- Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
+ Map<String, Plugin<ConfigProvider>> configProviderPluginInstances =
new HashMap<>();
for (Map.Entry<String, String> entry : providerMap.entrySet()) {
try {
String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey()
+ CONFIG_PROVIDERS_PARAM;
Map<String, ?> configProperties =
configProviderProperties(prefix, providerConfigProperties);
ConfigProvider provider = Utils.newInstance(entry.getValue(),
ConfigProvider.class);
provider.configure(configProperties);
- configProviderInstances.put(entry.getKey(), provider);
+ Plugin<ConfigProvider> providerPlugin =
Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
Review Comment:
I'm not sure I understand your explanation. If we always pass a null Metrics
instance, we're never going to call withPluginMetrics(), so ConfigProvider
instances can't register any metrics.
##########
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java:
##########
@@ -94,13 +95,13 @@ public ConfigTransformerResult transform(Map<String,
String> configs) {
Map<String, Long> ttls = new HashMap<>();
for (Map.Entry<String, Map<String, Set<String>>> entry :
keysByProvider.entrySet()) {
String providerName = entry.getKey();
- ConfigProvider provider = configProviders.get(providerName);
+ Plugin<ConfigProvider> providerPlugin =
configProviderPlugins.get(providerName);
Map<String, Set<String>> keysByPath = entry.getValue();
- if (provider != null && keysByPath != null) {
+ if (providerPlugin != null && keysByPath != null) {
Review Comment:
We probably want to check `providerPlugin.get()` is not null as well.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java:
##########
@@ -790,4 +806,13 @@ public void configure(Map<String, ?> configs) {
super.configure(configs);
}
}
+
+ public static class CustomMonitorableConfigProvider extends
MonitorableConfigProvider {
Review Comment:
Why do we need this class? can't we directly use `MonitorableConfigProvider`?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java:
##########
@@ -790,4 +806,13 @@ public void configure(Map<String, ?> configs) {
super.configure(configs);
}
}
+
+ public static class CustomMonitorableConfigProvider extends
MonitorableConfigProvider {
+
+ @Override
+ public void withPluginMetrics(PluginMetrics metrics) {
+ assertTrue(configured);
Review Comment:
This assertion is not enough! The test would pass if this code is never
executed
##########
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java:
##########
@@ -45,12 +46,12 @@ public class ConfigTransformerTest {
@BeforeEach
public void setup() {
- configTransformer = new
ConfigTransformer(Collections.singletonMap("test", new TestConfigProvider()));
+ configTransformer = new ConfigTransformer(Map.of("test",
Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers")));
}
@Test
public void testReplaceVariable() {
- ConfigTransformerResult result =
configTransformer.transform(Collections.singletonMap(MY_KEY,
"${test:testPath:testKey}"));
+ ConfigTransformerResult result =
configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKey}"));
Review Comment:
Not sure all these changes make sense in this PR. It's a bit noisy and it's
not even enough to get rid of the `Collections` import.
##########
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java:
##########
Review Comment:
I don't see any tests in clients that use this class. We should have clients
tests using this ConfigProvider and check its metric is registered correctly.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java:
##########
@@ -383,16 +388,27 @@ public void
newConfigProviderShouldConfigureWithPluginClassLoader() {
createConfig();
}
- ConfigProvider plugin = plugins.newConfigProvider(
+ Plugin<ConfigProvider> plugin = plugins.newConfigProvider(
config,
- providerPrefix,
- ClassLoaderUsage.PLUGINS
+ providerName,
+ ClassLoaderUsage.PLUGINS,
+ null
);
- assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect
samples");
- Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin)
plugin).flatten();
+ assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot
collect samples");
+ Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin)
plugin.get()).flatten();
assertTrue(samples.containsKey("configure"));
- assertPluginClassLoaderAlwaysActive(plugin);
+ assertPluginClassLoaderAlwaysActive(plugin.get());
+ }
+
+ @Test
+ public void newConfigProviderShouldCallWithPluginMetricsAfterConfigure() {
+ String providerName = "monitorable";
+ String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." +
providerName;
+ props.put(providerPrefix + ".class",
CustomMonitorableConfigProvider.class.getName());
+ createConfig();
+ Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config,
providerName, ClassLoaderUsage.PLUGINS, new Metrics());
+ assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());
Review Comment:
We need to check the metrics from the provider have been created.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]