xvrl commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r430557121
########## File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; + +/** + * MetricsContext encapsulates additional metadata about metrics exposed via a + * {@link org.apache.kafka.common.metrics.MetricsReporter} + * + * The metadata map provides following information: + * - a <code>_namespace</node> field indicating the component exposing metrics + * e.g. kafka.server, kafka.consumer + * {@link JmxReporter} uses this as prefix for mbean names + * + * - for clients and streams libraries: any freeform fields passed in via + * client properties in the form of `metrics.context.<key>=<value> + * + * - for kafka brokers: kafka.broker.id, kafka.cluster.id + * - for connect workers: connect.kafka.cluster.id, connect.group.id + */ +@InterfaceStability.Evolving +public interface MetricsContext { Review comment: I think keeping a Map interface makes it more convenient to work with. It gives you all the helper methods and streaming interfaces rather than having to hand-roll those things for someone consuming the api. ########## File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; + +/** + * MetricsContext encapsulates additional metadata about metrics exposed via a + * {@link org.apache.kafka.common.metrics.MetricsReporter} + * + * The metadata map provides following information: + * - a <code>_namespace</node> field indicating the component exposing metrics + * e.g. kafka.server, kafka.consumer + * {@link JmxReporter} uses this as prefix for mbean names + * + * - for clients and streams libraries: any freeform fields passed in via + * client properties in the form of `metrics.context.<key>=<value> + * + * - for kafka brokers: kafka.broker.id, kafka.cluster.id + * - for connect workers: connect.kafka.cluster.id, connect.group.id + */ +@InterfaceStability.Evolving +public interface MetricsContext { + /* predefined fields */ + String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix Review comment: I'm not convinced we should give namespace a special status over other fields, it just happens to be the only one we currently define by default for backwards compatibility reasons. If we find ourselves adding more of those, I agreee it would be worth revisiting how we expose pre-defined fields. ########## File path: core/src/main/scala/kafka/server/KafkaServer.scala ########## @@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP private var shutdownLatch = new CountDownLatch(1) + //properties for MetricsContext Review comment: @xiaodongdu what @mumrah was saying is that we could add the properties to the kafka broker configuration and pass those values into the context, similar to what we do for client. We can update the KIP and add that. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java ########## @@ -65,4 +68,12 @@ static String lookupKafkaClusterId(Admin adminClient) { + "Check worker's broker connection and security properties.", e); } } + + public static void addMetricsContextProperties(Map<String, Object> prop, WorkerConfig config, String clusterId) { + //add all properties predefined with "metrics.context." + prop.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX, false)); + //add connect properties + prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, clusterId); + prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_GROUP_ID, config.originals().get(DistributedConfig.GROUP_ID_CONFIG)); Review comment: @rhauch I've updated the KIP document, will send a follow-up email to announce the changes, once we finalize @mumrah 's feedback ########## File path: core/src/main/scala/kafka/server/KafkaServer.scala ########## @@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP private var shutdownLatch = new CountDownLatch(1) + //properties for MetricsContext Review comment: yes, I believe that's what @mumrah meant. If we decide to do this, I'll update the KIP and send a follow-up email with some of the other tweaks we discussed in this PR. ########## File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; + +/** + * MetricsContext encapsulates additional metadata about metrics exposed via a + * {@link org.apache.kafka.common.metrics.MetricsReporter} + * + * The metadata map provides following information: + * - a <code>_namespace</node> field indicating the component exposing metrics + * e.g. kafka.server, kafka.consumer + * {@link JmxReporter} uses this as prefix for mbean names + * + * - for clients and streams libraries: any freeform fields passed in via + * client properties in the form of `metrics.context.<key>=<value> + * + * - for kafka brokers: kafka.broker.id, kafka.cluster.id + * - for connect workers: connect.kafka.cluster.id, connect.group.id + */ +@InterfaceStability.Evolving +public interface MetricsContext { + /* predefined fields */ + String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix + + /** + * Returns metadata fields + */ + Map<String, String> metadata(); Review comment: ok, I updated the KIP to reflect this change. ########## File path: core/src/main/scala/kafka/server/KafkaServer.scala ########## @@ -384,6 +390,23 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP clusterResourceListeners.onUpdate(new ClusterResource(clusterId)) } + private[server] def notifyMetricsReporters(metricsReporters: Seq[AnyRef]): Unit = { + val metricsContext = createKafkaMetricsContext() + metricsReporters.foreach { + case x: MetricsReporter => x.contextChange(metricsContext) + case _ => //do nothing + } + } + + private[server] def createKafkaMetricsContext() : KafkaMetricsContext = { + val contextLabels = new util.HashMap[String, Object] + contextLabels.put(KAFKA_CLUSTER_ID, clusterId) + contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString) + contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX, false)) Review comment: the prefix should be stripped before adding the fields to the context. ########## File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java ########## @@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) throws ConfigExcept default void reconfigure(Map<String, ?> configs) { } + /** + * Provides context labels for the service or library exposing metrics + * + * @param metricsContext the metric context + */ + @InterfaceStability.Evolving + default void contextChange(MetricsContext metricsContext) { Review comment: someone on the ML commented that we might want to name this `contextChanged` (past tense). I don't have a strong feeling either way. Do you have any thoughts @mumrah @rhauch? ########## File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java ########## @@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) throws ConfigExcept default void reconfigure(Map<String, ?> configs) { } + /** + * Provides context labels for the service or library exposing metrics + * + * @param metricsContext the metric context + */ + @InterfaceStability.Evolving + default void contextChange(MetricsContext metricsContext) { Review comment: due to the way jmxreporter is initialized in Kafka today, it already gets called both before and after `init()` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org