rhauch commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r430667093



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
##########
@@ -63,7 +67,7 @@
      * @param config   the worker configuration; may not be null
      * @param time     the time; may not be null
      */
-    public ConnectMetrics(String workerId, WorkerConfig config, Time time) {
+    public ConnectMetrics(String workerId, WorkerConfig config, Time time, 
String clusterId) {

Review comment:
       Need to add the parameter to the JavaDoc:
   ```
   @param clusterId  the Kafka cluster ID
   ```

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ConnectUtils.class})
+@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
+public class WorkerGroupMemberTest {
+    @Mock
+    private ConfigBackingStore configBackingStore;
+    @Mock
+    private StatusBackingStore statusBackingStore;
+
+    @Test
+    public void testMetrics() throws Exception {
+        WorkerGroupMember member;
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerProps.put("offset.storage.file.filename", 
"/tmp/connect.offsets");
+        workerProps.put("group.id", "group-1");
+        workerProps.put("offset.storage.topic", "topic-1");
+        workerProps.put("config.storage.topic", "topic-1");
+        workerProps.put("status.storage.topic", "topic-1");
+        workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, 
MockConnectMetrics.MockMetricsReporter.class.getName());
+        DistributedConfig config = new DistributedConfig(workerProps);
+
+
+        LogContext logContext = new LogContext("[Worker clientId=client-1 + 
groupId= group-1]");
+
+        expectClusterId();
+
+        member = new WorkerGroupMember(config, "", configBackingStore,
+        null, Time.SYSTEM, "client-1", logContext);
+
+        for (MetricsReporter reporter : member.metrics().reporters()) {
+            if (reporter instanceof MockConnectMetrics.MockMetricsReporter) {
+                MockConnectMetrics.MockMetricsReporter mockMetricsReporter = 
(MockConnectMetrics.MockMetricsReporter) reporter;
+                assertEquals("cluster-1", 
mockMetricsReporter.getMetricsContext().metadata().get(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));
+                assertEquals("group-1", 
mockMetricsReporter.getMetricsContext().metadata().get(WorkerConfig.CONNECT_GROUP_ID));
+            }
+        }

Review comment:
       It might be good to verify that we entered the `if (reporter instance 
Mock...)` block at least once. Otherwise, we might have a bug elsewhere that 
failed to instantiate the `MockMetricsReporter` class, and this portion of the 
test would still pass.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -92,6 +93,7 @@
     private final ExecutorService executor;
     private final Time time;
     private final String workerId;
+    private final String clusterId;

Review comment:
       @xiaodongdu, I was not suggesting getting rid of the field. It's fine to 
have a new field, but we should call the field `kafkaClusterId` rather than 
`clusterId` since the latter could be misinterpreted to mean the _Connect_ 
cluster ID.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
##########
@@ -66,20 +67,22 @@ public void configure(final WorkerConfig config) {
         if (topic == null || topic.trim().length() == 0)
             throw new ConfigException("Offset storage topic must be 
specified");
 
+        String clusterId = ConnectUtils.lookupKafkaClusterId(config);
         data = new HashMap<>();
 
         Map<String, Object> originals = config.originals();
         Map<String, Object> producerProps = new HashMap<>(originals);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
+        ConnectUtils.addMetricsContextProperties(producerProps, config, 
clusterId);
 
         Map<String, Object> consumerProps = new HashMap<>(originals);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        ConnectUtils.addMetricsContextProperties(consumerProps, config, 
clusterId);

Review comment:
       Should we use `ConnectUtils.addMetricsContextProperties(adminProps, 
config, clusterId)` a few lines below this? (See the similar [changes you made 
in 
`KafkaStatusBackingStore`](https://github.com/apache/kafka/pull/8691/files#diff-5740faccd8040e325fc9ba0e395f31f6R170)
 and 
[KafkaConfigBackingStore](https://github.com/apache/kafka/pull/8691/files#diff-1c045c8737aea4c820319a6de65af8a4R462).)

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ConnectUtils.class})
+@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
+public class WorkerGroupMemberTest {
+    @Mock
+    private ConfigBackingStore configBackingStore;
+    @Mock
+    private StatusBackingStore statusBackingStore;
+
+    @Test
+    public void testMetrics() throws Exception {
+        WorkerGroupMember member;
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");

Review comment:
       Let's remove these deprecated configs.
   ```suggestion
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##########
@@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) 
throws ConfigExcept
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Provides context labels for the service or library exposing metrics
+     *
+     * @param metricsContext the metric context
+     */
+    @InterfaceStability.Evolving
+    default void contextChange(MetricsContext metricsContext) {

Review comment:
       I really don't have a preference. Past tense is a little odd, but I 
could see where `changeContext(...)` or `setContext(...)` are present tense and 
more conventional.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##########
@@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) 
throws ConfigExcept
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Provides context labels for the service or library exposing metrics
+     *
+     * @param metricsContext the metric context
+     */
+    @InterfaceStability.Evolving
+    default void contextChange(MetricsContext metricsContext) {

Review comment:
       I really don't have a strong preference. Past tense is a little odd, but 
I think `changeContext(...)` or `setContext(...)` are present-tense and more 
conventional.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional contextLabels about metrics exposed 
via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The contextLabels map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix
+
+    /**
+     * Returns contextLabels fields
+     */
+    Map<String, String> contextLabels();

Review comment:
       This JavaDoc is incomplete. Perhaps something like:
   ```suggestion
       /**
        * Returns the labels for this metrics context.
        *
        * @return the map of label keys and values; never null but possibly 
empty
        */
       Map<String, String> contextLabels();
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A implementation of MetricsContext, it encapsulates required metrics 
context properties for Kafka services and clients
+ */
+public class KafkaMetricsContext implements MetricsContext {
+    /**
+     * Client or Service's contextLabels map.
+     */
+    private final Map<String, String> contextLabels = new HashMap<>();
+
+    /**
+     * Create a MetricsContext with namespace, no service or client properties
+     * @param namespace value for _namespace key
+     */
+    public KafkaMetricsContext(String namespace) {
+        this(namespace, new HashMap<>());
+    }
+
+    /**
+     * Create a MetricsContext with namespace, service or client properties
+     * @param namespace value for _namespace key
+     * @param contextLabels  contextLabels additional entries to add to the 
context.
+     *                  values will be converted to string using 
Object.toString()
+     */
+    public KafkaMetricsContext(String namespace, Map<String, ?> contextLabels) 
{
+        this.contextLabels.put(MetricsContext.NAMESPACE, namespace);
+        contextLabels.forEach((key, value) -> this.contextLabels.put(key, 
value.toString()));
+    }
+
+    public Map<String, String> contextLabels() {

Review comment:
       Need an override annotation here:
   ```suggestion
       @Override
       public Map<String, String> contextLabels() {
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##########
@@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) 
throws ConfigExcept
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Provides context labels for the service or library exposing metrics
+     *
+     * @param metricsContext the metric context
+     */
+    @InterfaceStability.Evolving
+    default void contextChange(MetricsContext metricsContext) {

Review comment:
       It would also be good to identify when this is called relative to other 
methods. For example, it is always called before `init(...)` is called. But can 
it be called again, or is that the only time this method is called?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to