rhauch commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r430667093
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java ########## @@ -63,7 +67,7 @@ * @param config the worker configuration; may not be null * @param time the time; may not be null */ - public ConnectMetrics(String workerId, WorkerConfig config, Time time) { + public ConnectMetrics(String workerId, WorkerConfig config, Time time, String clusterId) { Review comment: Need to add the parameter to the JavaDoc: ``` @param clusterId the Kafka cluster ID ``` ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.distributed; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.MockConnectMetrics; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.storage.ConfigBackingStore; +import org.apache.kafka.connect.storage.StatusBackingStore; +import org.apache.kafka.connect.util.ConnectUtils; +import org.easymock.EasyMock; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ConnectUtils.class}) +@PowerMockIgnore({"javax.management.*", "javax.crypto.*"}) +public class WorkerGroupMemberTest { + @Mock + private ConfigBackingStore configBackingStore; + @Mock + private StatusBackingStore statusBackingStore; + + @Test + public void testMetrics() throws Exception { + WorkerGroupMember member; + Map<String, String> workerProps = new HashMap<>(); + workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter.schemas.enable", "false"); + workerProps.put("internal.value.converter.schemas.enable", "false"); + workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); + workerProps.put("group.id", "group-1"); + workerProps.put("offset.storage.topic", "topic-1"); + workerProps.put("config.storage.topic", "topic-1"); + workerProps.put("status.storage.topic", "topic-1"); + workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockConnectMetrics.MockMetricsReporter.class.getName()); + DistributedConfig config = new DistributedConfig(workerProps); + + + LogContext logContext = new LogContext("[Worker clientId=client-1 + groupId= group-1]"); + + expectClusterId(); + + member = new WorkerGroupMember(config, "", configBackingStore, + null, Time.SYSTEM, "client-1", logContext); + + for (MetricsReporter reporter : member.metrics().reporters()) { + if (reporter instanceof MockConnectMetrics.MockMetricsReporter) { + MockConnectMetrics.MockMetricsReporter mockMetricsReporter = (MockConnectMetrics.MockMetricsReporter) reporter; + assertEquals("cluster-1", mockMetricsReporter.getMetricsContext().metadata().get(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID)); + assertEquals("group-1", mockMetricsReporter.getMetricsContext().metadata().get(WorkerConfig.CONNECT_GROUP_ID)); + } + } Review comment: It might be good to verify that we entered the `if (reporter instance Mock...)` block at least once. Otherwise, we might have a bug elsewhere that failed to instantiate the `MockMetricsReporter` class, and this portion of the test would still pass. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -92,6 +93,7 @@ private final ExecutorService executor; private final Time time; private final String workerId; + private final String clusterId; Review comment: @xiaodongdu, I was not suggesting getting rid of the field. It's fine to have a new field, but we should call the field `kafkaClusterId` rather than `clusterId` since the latter could be misinterpreted to mean the _Connect_ cluster ID. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java ########## @@ -66,20 +67,22 @@ public void configure(final WorkerConfig config) { if (topic == null || topic.trim().length() == 0) throw new ConfigException("Offset storage topic must be specified"); + String clusterId = ConnectUtils.lookupKafkaClusterId(config); data = new HashMap<>(); Map<String, Object> originals = config.originals(); Map<String, Object> producerProps = new HashMap<>(originals); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); + ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId); Map<String, Object> consumerProps = new HashMap<>(originals); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId); Review comment: Should we use `ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId)` a few lines below this? (See the similar [changes you made in `KafkaStatusBackingStore`](https://github.com/apache/kafka/pull/8691/files#diff-5740faccd8040e325fc9ba0e395f31f6R170) and [KafkaConfigBackingStore](https://github.com/apache/kafka/pull/8691/files#diff-1c045c8737aea4c820319a6de65af8a4R462).) ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.distributed; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.MockConnectMetrics; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.storage.ConfigBackingStore; +import org.apache.kafka.connect.storage.StatusBackingStore; +import org.apache.kafka.connect.util.ConnectUtils; +import org.easymock.EasyMock; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ConnectUtils.class}) +@PowerMockIgnore({"javax.management.*", "javax.crypto.*"}) +public class WorkerGroupMemberTest { + @Mock + private ConfigBackingStore configBackingStore; + @Mock + private StatusBackingStore statusBackingStore; + + @Test + public void testMetrics() throws Exception { + WorkerGroupMember member; + Map<String, String> workerProps = new HashMap<>(); + workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter.schemas.enable", "false"); + workerProps.put("internal.value.converter.schemas.enable", "false"); Review comment: Let's remove these deprecated configs. ```suggestion ``` ########## File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java ########## @@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) throws ConfigExcept default void reconfigure(Map<String, ?> configs) { } + /** + * Provides context labels for the service or library exposing metrics + * + * @param metricsContext the metric context + */ + @InterfaceStability.Evolving + default void contextChange(MetricsContext metricsContext) { Review comment: I really don't have a preference. Past tense is a little odd, but I could see where `changeContext(...)` or `setContext(...)` are present tense and more conventional. ########## File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java ########## @@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) throws ConfigExcept default void reconfigure(Map<String, ?> configs) { } + /** + * Provides context labels for the service or library exposing metrics + * + * @param metricsContext the metric context + */ + @InterfaceStability.Evolving + default void contextChange(MetricsContext metricsContext) { Review comment: I really don't have a strong preference. Past tense is a little odd, but I think `changeContext(...)` or `setContext(...)` are present-tense and more conventional. ########## File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; + +/** + * MetricsContext encapsulates additional contextLabels about metrics exposed via a + * {@link org.apache.kafka.common.metrics.MetricsReporter} + * + * The contextLabels map provides following information: + * - a <code>_namespace</node> field indicating the component exposing metrics + * e.g. kafka.server, kafka.consumer + * {@link JmxReporter} uses this as prefix for mbean names + * + * - for clients and streams libraries: any freeform fields passed in via + * client properties in the form of `metrics.context.<key>=<value> + * + * - for kafka brokers: kafka.broker.id, kafka.cluster.id + * - for connect workers: connect.kafka.cluster.id, connect.group.id + */ +@InterfaceStability.Evolving +public interface MetricsContext { + /* predefined fields */ + String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix + + /** + * Returns contextLabels fields + */ + Map<String, String> contextLabels(); Review comment: This JavaDoc is incomplete. Perhaps something like: ```suggestion /** * Returns the labels for this metrics context. * * @return the map of label keys and values; never null but possibly empty */ Map<String, String> contextLabels(); ``` ########## File path: clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * A implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients + */ +public class KafkaMetricsContext implements MetricsContext { + /** + * Client or Service's contextLabels map. + */ + private final Map<String, String> contextLabels = new HashMap<>(); + + /** + * Create a MetricsContext with namespace, no service or client properties + * @param namespace value for _namespace key + */ + public KafkaMetricsContext(String namespace) { + this(namespace, new HashMap<>()); + } + + /** + * Create a MetricsContext with namespace, service or client properties + * @param namespace value for _namespace key + * @param contextLabels contextLabels additional entries to add to the context. + * values will be converted to string using Object.toString() + */ + public KafkaMetricsContext(String namespace, Map<String, ?> contextLabels) { + this.contextLabels.put(MetricsContext.NAMESPACE, namespace); + contextLabels.forEach((key, value) -> this.contextLabels.put(key, value.toString())); + } + + public Map<String, String> contextLabels() { Review comment: Need an override annotation here: ```suggestion @Override public Map<String, String> contextLabels() { ``` ########## File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java ########## @@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) throws ConfigExcept default void reconfigure(Map<String, ?> configs) { } + /** + * Provides context labels for the service or library exposing metrics + * + * @param metricsContext the metric context + */ + @InterfaceStability.Evolving + default void contextChange(MetricsContext metricsContext) { Review comment: It would also be good to identify when this is called relative to other methods. For example, it is always called before `init(...)` is called. But can it be called again, or is that the only time this method is called? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org