xvrl commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r430557121



##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {

Review comment:
       I think keeping a Map interface makes it more convenient to work with. 
It gives you all the helper methods and streaming interfaces rather than having 
to hand-roll those things for someone consuming the api.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix

Review comment:
       I'm not convinced we should give namespace a special status over other 
fields, it just happens to be the only one we currently define by default for 
backwards compatibility reasons. If we find ourselves adding more of those, I 
agreee it would be worth revisiting how we expose pre-defined fields.

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  //properties for MetricsContext

Review comment:
       @xiaodongdu what @mumrah was saying is that we could add the properties 
to the kafka broker configuration and pass those values into the context, 
similar to what we do for client. We can update the KIP and add that.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
##########
@@ -65,4 +68,12 @@ static String lookupKafkaClusterId(Admin adminClient) {
                                        + "Check worker's broker connection and 
security properties.", e);
         }
     }
+
+    public static void addMetricsContextProperties(Map<String, Object> prop, 
WorkerConfig config, String clusterId) {
+        //add all properties predefined with "metrics.context."
+        
prop.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX,
 false));
+        //add connect properties
+        prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + 
WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, clusterId);
+        prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + 
WorkerConfig.CONNECT_GROUP_ID, 
config.originals().get(DistributedConfig.GROUP_ID_CONFIG));

Review comment:
       @rhauch I've updated the KIP document, will send a follow-up email to 
announce the changes, once we finalize @mumrah 's feedback

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  //properties for MetricsContext

Review comment:
       yes, I believe that's what @mumrah meant. If we decide to do this, I'll 
update the KIP and send a follow-up email with some of the other tweaks we 
discussed in this PR. 

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix
+
+    /**
+     * Returns metadata fields
+     */
+    Map<String, String> metadata();

Review comment:
       ok, I updated the KIP to reflect this change.

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -384,6 +390,23 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
     clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
   }
 
+  private[server] def notifyMetricsReporters(metricsReporters: Seq[AnyRef]): 
Unit = {
+    val metricsContext = createKafkaMetricsContext()
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+
+  private[server] def createKafkaMetricsContext() : KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX,
 false))

Review comment:
       the prefix should be stripped before adding the fields to the context.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##########
@@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) 
throws ConfigExcept
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Provides context labels for the service or library exposing metrics
+     *
+     * @param metricsContext the metric context
+     */
+    @InterfaceStability.Evolving
+    default void contextChange(MetricsContext metricsContext) {

Review comment:
       someone on the ML commented that we might want to name this 
`contextChanged` (past tense). I don't have a strong feeling either way. Do you 
have any thoughts @mumrah @rhauch?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##########
@@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) 
throws ConfigExcept
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Provides context labels for the service or library exposing metrics
+     *
+     * @param metricsContext the metric context
+     */
+    @InterfaceStability.Evolving
+    default void contextChange(MetricsContext metricsContext) {

Review comment:
       due to the way jmxreporter is initialized in Kafka today, it already 
gets called both before and after `init()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to