This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 191f2fa KAFKA-7992: Introduce start-time-ms metric (#6318) 191f2fa is described below commit 191f2faae07b6608b0601dc2caf204b196a8fc47 Author: Stanislav Kozlovski <familyguyuser...@windowslive.com> AuthorDate: Wed May 1 16:58:02 2019 +0100 KAFKA-7992: Introduce start-time-ms metric (#6318) Reviewers: Colin P. McCabe <cmcc...@apache.org>, Ismael Juma <ism...@juma.me.uk> --- .../kafka/clients/admin/KafkaAdminClient.java | 2 +- .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../apache/kafka/common/utils/AppInfoParser.java | 32 +++++--- .../kafka/common/utils/AppInfoParserTest.java | 88 ++++++++++++++++++++++ .../kafka/connect/runtime/ConnectMetrics.java | 2 +- .../runtime/distributed/WorkerGroupMember.java | 2 +- core/src/main/scala/kafka/common/AppInfo.scala | 54 ------------- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- 9 files changed, 117 insertions(+), 69 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 23d7fd5..ffe24ca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -451,7 +451,7 @@ public class KafkaAdminClient extends AdminClient { this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG); this.retryBackoffMs = config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG); config.logUnused(); - AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics); + AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds()); log.debug("Kafka admin client initialized"); thread.start(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index c73a028..a59e857 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -811,7 +811,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { isolationLevel); config.logUnused(); - AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics); + AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds()); log.debug("Kafka consumer initialized"); } catch (Throwable t) { // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121 diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b83d98e..06a0fc1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -423,7 +423,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start(); config.logUnused(); - AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics); + AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds()); log.debug("Kafka producer started"); } catch (Throwable t) { // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121 diff --git a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java index 8a12fbc..3ceca99 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java @@ -36,6 +36,8 @@ public class AppInfoParser { private static final String VERSION; private static final String COMMIT_ID; + protected static final String DEFAULT_VALUE = "unknown"; + static { Properties props = new Properties(); try (InputStream resourceStream = AppInfoParser.class.getResourceAsStream("/kafka/kafka-version.properties")) { @@ -43,8 +45,8 @@ public class AppInfoParser { } catch (Exception e) { log.warn("Error while loading kafka-version.properties: {}", e.getMessage()); } - VERSION = props.getProperty("version", "unknown").trim(); - COMMIT_ID = props.getProperty("commitId", "unknown").trim(); + VERSION = props.getProperty("version", DEFAULT_VALUE).trim(); + COMMIT_ID = props.getProperty("commitId", DEFAULT_VALUE).trim(); } public static String getVersion() { @@ -55,13 +57,13 @@ public class AppInfoParser { return COMMIT_ID; } - public static synchronized void registerAppInfo(String prefix, String id, Metrics metrics) { + public static synchronized void registerAppInfo(String prefix, String id, Metrics metrics, long nowMs) { try { ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + Sanitizer.jmxSanitize(id)); - AppInfo mBean = new AppInfo(); + AppInfo mBean = new AppInfo(nowMs); ManagementFactory.getPlatformMBeanServer().registerMBean(mBean, name); - registerMetrics(metrics); // prefix will be added later by JmxReporter + registerMetrics(metrics, mBean); // prefix will be added later by JmxReporter } catch (JMException e) { log.warn("Error registering AppInfo mbean", e); } @@ -84,10 +86,11 @@ public class AppInfoParser { return metrics.metricName(name, "app-info", "Metric indicating " + name); } - private static void registerMetrics(Metrics metrics) { + private static void registerMetrics(Metrics metrics, AppInfo appInfo) { if (metrics != null) { - metrics.addMetric(metricName(metrics, "version"), new ImmutableValue<>(VERSION)); - metrics.addMetric(metricName(metrics, "commit-id"), new ImmutableValue<>(COMMIT_ID)); + metrics.addMetric(metricName(metrics, "version"), new ImmutableValue<>(appInfo.getVersion())); + metrics.addMetric(metricName(metrics, "commit-id"), new ImmutableValue<>(appInfo.getCommitId())); + metrics.addMetric(metricName(metrics, "start-time-ms"), new ImmutableValue<>(appInfo.getStartTimeMs())); } } @@ -95,19 +98,25 @@ public class AppInfoParser { if (metrics != null) { metrics.removeMetric(metricName(metrics, "version")); metrics.removeMetric(metricName(metrics, "commit-id")); + metrics.removeMetric(metricName(metrics, "start-time-ms")); } } public interface AppInfoMBean { String getVersion(); String getCommitId(); + Long getStartTimeMs(); } public static class AppInfo implements AppInfoMBean { - public AppInfo() { + private final Long startTimeMs; + + public AppInfo(long startTimeMs) { + this.startTimeMs = startTimeMs; log.info("Kafka version: {}", AppInfoParser.getVersion()); log.info("Kafka commitId: {}", AppInfoParser.getCommitId()); + log.info("Kafka startTimeMs: {}", startTimeMs); } @Override @@ -120,6 +129,11 @@ public class AppInfoParser { return AppInfoParser.getCommitId(); } + @Override + public Long getStartTimeMs() { + return startTimeMs; + } + } static class ImmutableValue<T> implements Gauge<T> { diff --git a/clients/src/test/java/org/apache/kafka/common/utils/AppInfoParserTest.java b/clients/src/test/java/org/apache/kafka/common/utils/AppInfoParserTest.java new file mode 100644 index 0000000..34dba81 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/AppInfoParserTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import org.apache.kafka.common.metrics.Metrics; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.management.JMException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import java.lang.management.ManagementFactory; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class AppInfoParserTest { + private static final String EXPECTED_COMMIT_VERSION = AppInfoParser.DEFAULT_VALUE; + private static final String EXPECTED_VERSION = AppInfoParser.DEFAULT_VALUE; + private static final Long EXPECTED_START_MS = 1552313875722L; + private static final String METRICS_PREFIX = "app-info-test"; + private static final String METRICS_ID = "test"; + + private Metrics metrics; + private MBeanServer mBeanServer; + + @Before + public void setUp() { + metrics = new Metrics(new MockTime(1)); + mBeanServer = ManagementFactory.getPlatformMBeanServer(); + } + + @After + public void tearDown() { + metrics.close(); + } + + @Test + public void testRegisterAppInfoRegistersMetrics() throws JMException { + registerAppInfo(); + } + + @Test + public void testUnregisterAppInfoUnregistersMetrics() throws JMException { + registerAppInfo(); + AppInfoParser.unregisterAppInfo(METRICS_PREFIX, METRICS_ID, metrics); + + assertFalse(mBeanServer.isRegistered(expectedAppObjectName())); + assertNull(metrics.metric(metrics.metricName("commit-id", "app-info"))); + assertNull(metrics.metric(metrics.metricName("version", "app-info"))); + assertNull(metrics.metric(metrics.metricName("start-time-ms", "app-info"))); + } + + private void registerAppInfo() throws JMException { + assertEquals(EXPECTED_COMMIT_VERSION, AppInfoParser.getCommitId()); + assertEquals(EXPECTED_VERSION, AppInfoParser.getVersion()); + + AppInfoParser.registerAppInfo(METRICS_PREFIX, METRICS_ID, metrics, EXPECTED_START_MS); + + assertTrue(mBeanServer.isRegistered(expectedAppObjectName())); + assertEquals(EXPECTED_COMMIT_VERSION, metrics.metric(metrics.metricName("commit-id", "app-info")).metricValue()); + assertEquals(EXPECTED_VERSION, metrics.metric(metrics.metricName("version", "app-info")).metricValue()); + assertEquals(EXPECTED_START_MS, metrics.metric(metrics.metricName("start-time-ms", "app-info")).metricValue()); + } + + private ObjectName expectedAppObjectName() throws MalformedObjectNameException { + return new ObjectName(METRICS_PREFIX + ":type=app-info,id=" + METRICS_ID); + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java index cf82f86..d09b06d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java @@ -81,7 +81,7 @@ public class ConnectMetrics { reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); LOG.debug("Registering Connect metrics with JMX for worker '{}'", workerId); - AppInfoParser.registerAppInfo(JMX_PREFIX, workerId, metrics); + AppInfoParser.registerAppInfo(JMX_PREFIX, workerId, metrics, time.milliseconds()); } /** diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index 6591be0..71ce91e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -141,7 +141,7 @@ public class WorkerGroupMember { configStorage, listener); - AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics); + AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds()); log.debug("Connect group member created"); } catch (Throwable t) { // call close methods if internal objects are already constructed diff --git a/core/src/main/scala/kafka/common/AppInfo.scala b/core/src/main/scala/kafka/common/AppInfo.scala deleted file mode 100644 index f77bdf5..0000000 --- a/core/src/main/scala/kafka/common/AppInfo.scala +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -import com.yammer.metrics.core.Gauge -import kafka.metrics.KafkaMetricsGroup -import org.apache.kafka.common.utils.AppInfoParser - -object AppInfo extends KafkaMetricsGroup { - private var isRegistered = false - private val lock = new Object() - - def registerInfo(): Unit = { - lock.synchronized { - if (isRegistered) { - return - } - } - - newGauge("Version", - new Gauge[String] { - def value = { - AppInfoParser.getVersion() - } - }) - - newGauge("CommitID", - new Gauge[String] { - def value = { - AppInfoParser.getCommitId() - } - }) - - lock.synchronized { - isRegistered = true - } - - } -} diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 2db3839..4a25811 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -332,7 +332,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP shutdownLatch = new CountDownLatch(1) startupComplete.set(true) isStartingUp.set(false) - AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics) + AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics, time.milliseconds()) info("started") } }