http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java new file mode 100644 index 0000000..efb38b2 --- /dev/null +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.jmx; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanInfo; +import javax.management.MBeanServer; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; +import java.lang.management.ManagementFactory; + +import static org.junit.Assert.assertEquals; + +public class JMXReporterTest extends TestLogger { + + @Test + public void testReplaceInvalidChars() { + assertEquals("", JMXReporter.replaceInvalidChars("")); + assertEquals("abc", JMXReporter.replaceInvalidChars("abc")); + assertEquals("abc", JMXReporter.replaceInvalidChars("abc\"")); + assertEquals("abc", JMXReporter.replaceInvalidChars("\"abc")); + assertEquals("abc", JMXReporter.replaceInvalidChars("\"abc\"")); + assertEquals("abc", JMXReporter.replaceInvalidChars("\"a\"b\"c\"")); + assertEquals("", JMXReporter.replaceInvalidChars("\"\"\"\"")); + assertEquals("____", JMXReporter.replaceInvalidChars(" ")); + assertEquals("ab_-(c)-", JMXReporter.replaceInvalidChars("\"ab ;(c)'")); + assertEquals("a_b_c", JMXReporter.replaceInvalidChars("a b c")); + assertEquals("a_b_c_", JMXReporter.replaceInvalidChars("a b c ")); + assertEquals("a-b-c-", JMXReporter.replaceInvalidChars("a;b'c*")); + assertEquals("a------b------c", JMXReporter.replaceInvalidChars("a,=;:?'b,=;:?'c")); + } + + /** + * Verifies that the JMXReporter properly generates the JMX name. + */ + @Test + public void testGenerateName() { + String[] scope = {"value0", "value1", "\"value2 (test),=;:?'"}; + String jmxName = JMXReporter.generateJmxName("TestMetric", scope); + + assertEquals("org.apache.flink.metrics:key0=value0,key1=value1,key2=value2_(test)------,name=TestMetric", jmxName); + } + + /** + * Verifies that multiple JMXReporters can be started on the same machine and register metrics at the MBeanServer. + * + * @throws Exception if the attribute/mbean could not be found or the test is broken + */ + @Test + public void testPortConflictHandling() throws Exception { + Configuration cfg = new Configuration(); + MetricRegistry reg = new MetricRegistry(cfg); + + TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm"); + + JMXReporter rep1 = new JMXReporter(); + JMXReporter rep2 = new JMXReporter(); + + MetricConfig cfg1 = new MetricConfig(); + cfg1.setProperty("port", "9020-9035"); + + rep1.open(cfg1); + rep2.open(cfg1); + + rep1.notifyOfAddedMetric(new Gauge<Integer>() { + @Override + public Integer getValue() { + return 1; + } + }, "rep1", new TaskManagerMetricGroup(reg, "host", "tm")); + + rep2.notifyOfAddedMetric(new Gauge<Integer>() { + @Override + public Integer getValue() { + return 2; + } + }, "rep2", new TaskManagerMetricGroup(reg, "host", "tm")); + + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + + ObjectName objectName1 = new ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents())); + ObjectName objectName2 = new ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents())); + + assertEquals(1, mBeanServer.getAttribute(objectName1, "Value")); + assertEquals(2, mBeanServer.getAttribute(objectName2, "Value")); + + rep1.close(); + rep2.close(); + reg.shutdown(); + } + + /** + * Verifies that we can connect to multiple JMXReporters running on the same machine. + * + * @throws Exception + */ + @Test + public void testJMXAvailability() throws Exception { + Configuration cfg = new Configuration(); + MetricRegistry reg = new MetricRegistry(cfg); + + TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm"); + + JMXReporter rep1 = new JMXReporter(); + JMXReporter rep2 = new JMXReporter(); + + MetricConfig cfg1 = new MetricConfig(); + cfg1.setProperty("port", "9040-9055"); + rep1.open(cfg1); + rep2.open(cfg1); + + rep1.notifyOfAddedMetric(new Gauge<Integer>() { + @Override + public Integer getValue() { + return 1; + } + }, "rep1", new TaskManagerMetricGroup(reg, "host", "tm")); + + rep2.notifyOfAddedMetric(new Gauge<Integer>() { + @Override + public Integer getValue() { + return 2; + } + }, "rep2", new TaskManagerMetricGroup(reg, "host", "tm")); + + ObjectName objectName1 = new ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents())); + ObjectName objectName2 = new ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents())); + + JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + rep1.getPort() + "/jndi/rmi://localhost:" + rep1.getPort() + "/jmxrmi"); + JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1); + MBeanServerConnection mCon1 = jmxCon1.getMBeanServerConnection(); + + assertEquals(1, mCon1.getAttribute(objectName1, "Value")); + assertEquals(2, mCon1.getAttribute(objectName2, "Value")); + + url1 = null; + jmxCon1.close(); + jmxCon1 = null; + mCon1 = null; + + JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + rep2.getPort() + "/jndi/rmi://localhost:" + rep2.getPort() + "/jmxrmi"); + JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2); + MBeanServerConnection mCon2 = jmxCon2.getMBeanServerConnection(); + + assertEquals(1, mCon2.getAttribute(objectName1, "Value")); + assertEquals(2, mCon2.getAttribute(objectName2, "Value")); + + url2 = null; + jmxCon2.close(); + jmxCon2 = null; + mCon2 = null; + + rep1.close(); + rep2.close(); + reg.shutdown(); + } + + /** + * Tests that histograms are properly reported via the JMXReporter. + */ + @Test + public void testHistogramReporting() throws Exception { + MetricRegistry registry = null; + String histogramName = "histogram"; + + try { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_CLASS, JMXReporter.class.getName()); + + registry = new MetricRegistry(config); + + TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId"); + + TestingHistogram histogram = new TestingHistogram(); + + metricGroup.histogram(histogramName, histogram); + + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + + ObjectName objectName = new ObjectName(JMXReporter.generateJmxName(histogramName, metricGroup.getScopeComponents())); + + MBeanInfo info = mBeanServer.getMBeanInfo(objectName); + + MBeanAttributeInfo[] attributeInfos = info.getAttributes(); + + assertEquals(11, attributeInfos.length); + + assertEquals(histogram.getCount(), mBeanServer.getAttribute(objectName, "Count")); + assertEquals(histogram.getStatistics().getMean(), mBeanServer.getAttribute(objectName, "Mean")); + assertEquals(histogram.getStatistics().getStdDev(), mBeanServer.getAttribute(objectName, "StdDev")); + assertEquals(histogram.getStatistics().getMax(), mBeanServer.getAttribute(objectName, "Max")); + assertEquals(histogram.getStatistics().getMin(), mBeanServer.getAttribute(objectName, "Min")); + assertEquals(histogram.getStatistics().getQuantile(0.5), mBeanServer.getAttribute(objectName, "Median")); + assertEquals(histogram.getStatistics().getQuantile(0.75), mBeanServer.getAttribute(objectName, "75thPercentile")); + assertEquals(histogram.getStatistics().getQuantile(0.95), mBeanServer.getAttribute(objectName, "95thPercentile")); + assertEquals(histogram.getStatistics().getQuantile(0.98), mBeanServer.getAttribute(objectName, "98thPercentile")); + assertEquals(histogram.getStatistics().getQuantile(0.99), mBeanServer.getAttribute(objectName, "99thPercentile")); + assertEquals(histogram.getStatistics().getQuantile(0.999), mBeanServer.getAttribute(objectName, "999thPercentile")); + + } finally { + if (registry != null) { + registry.shutdown(); + } + } + } + + static class TestingHistogram implements Histogram { + + @Override + public void update(long value) { + + } + + @Override + public long getCount() { + return 1; + } + + @Override + public HistogramStatistics getStatistics() { + return new HistogramStatistics() { + @Override + public double getQuantile(double quantile) { + return quantile; + } + + @Override + public long[] getValues() { + return new long[0]; + } + + @Override + public int size() { + return 3; + } + + @Override + public double getMean() { + return 4; + } + + @Override + public double getStdDev() { + return 5; + } + + @Override + public long getMax() { + return 6; + } + + @Override + public long getMin() { + return 7; + } + }; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java new file mode 100644 index 0000000..8b0f672 --- /dev/null +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.metrics.jmx.JMXReporter; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class JMXJobManagerMetricTest { + /** + * Tests that metrics registered on the JobManager are actually accessible via JMX. + * + * @throws Exception + */ + @Test + public void testJobManagerJMXMetricAccess() throws Exception { + Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); + Configuration flinkConfiguration = new Configuration(); + + flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_CLASS, JMXReporter.class.getName()); + flinkConfiguration.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>"); + flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "--port 9060-9075"); + + TestingCluster flink = new TestingCluster(flinkConfiguration); + + try { + flink.start(); + + JobVertex sourceJobVertex = new JobVertex("Source"); + sourceJobVertex.setInvokableClass(BlockingInvokable.class); + + JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex); + jobGraph.setSnapshotSettings(new JobSnapshottingSettings( + Collections.<JobVertexID>emptyList(), + Collections.<JobVertexID>emptyList(), + Collections.<JobVertexID>emptyList(), + 500, 500, 50, 5)); + + flink.waitForActorsToBeAlive(); + + flink.submitJobDetached(jobGraph); + + Future<Object> jobRunning = flink.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), deadline.timeLeft()); + Await.ready(jobRunning, deadline.timeLeft()); + + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + ObjectName objectName1 = new ObjectName("org.apache.flink.metrics:key0=jobmanager,key1=TestingJob,name=lastCheckpointSize"); + assertEquals(-1L, mBeanServer.getAttribute(objectName1, "Value")); + + Future<Object> jobFinished = flink.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft()); + + BlockingInvokable.unblock(); + + // wait til the job has finished + Await.ready(jobFinished, deadline.timeLeft()); + } finally { + flink.stop(); + } + } + + public static class BlockingInvokable extends AbstractInvokable { + private static boolean blocking = true; + private static final Object lock = new Object(); + + @Override + public void invoke() throws Exception { + while (blocking) { + synchronized (lock) { + lock.wait(); + } + } + } + + public static void unblock() { + blocking = false; + + synchronized (lock) { + lock.notifyAll(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-jmx/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-jmx/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..2226f68 --- /dev/null +++ b/flink-metrics/flink-metrics-jmx/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-jmx/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/test/resources/logback-test.xml b/flink-metrics/flink-metrics-jmx/src/test/resources/logback-test.xml new file mode 100644 index 0000000..1c4ea08 --- /dev/null +++ b/flink-metrics/flink-metrics-jmx/src/test/resources/logback-test.xml @@ -0,0 +1,34 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + + <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/> + <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/> + <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/> + <logger name="org.apache.flink.configuration.Configuration" level="OFF"/> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-statsd/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-statsd/pom.xml b/flink-metrics/flink-metrics-statsd/pom.xml index 4b43e11..85d1dc4 100644 --- a/flink-metrics/flink-metrics-statsd/pom.xml +++ b/flink-metrics/flink-metrics-statsd/pom.xml @@ -42,7 +42,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> + <artifactId>flink-metrics-core</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> @@ -51,6 +51,13 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils-junit</artifactId> <version>${project.version}</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java index e0c6dc7..354eff1 100644 --- a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java +++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java @@ -19,11 +19,11 @@ package org.apache.flink.metrics.statsd; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.HistogramStatistics; +import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.reporter.AbstractReporter; import org.apache.flink.metrics.reporter.Scheduled; @@ -61,7 +61,7 @@ public class StatsDReporter extends AbstractReporter implements Scheduled { private InetSocketAddress address; @Override - public void open(Configuration config) { + public void open(MetricConfig config) { String host = config.getString(ARG_HOST, null); int port = config.getInteger(ARG_PORT, -1); http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java index 37a3477..d7ae04f 100644 --- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java +++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java @@ -24,12 +24,13 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.HistogramStatistics; -import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.SimpleCounter; -import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup; -import org.apache.flink.metrics.groups.TaskManagerMetricGroup; -import org.apache.flink.metrics.groups.TaskMetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.util.AbstractID; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -191,7 +192,7 @@ public class StatsDReporterTest extends TestLogger { */ public static class TestingStatsDReporter extends StatsDReporter { @Override - public void open(Configuration configuration) { + public void open(MetricConfig configuration) { // disable the socket creation } http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml index 542f49c..7c9aaee 100644 --- a/flink-metrics/pom.xml +++ b/flink-metrics/pom.xml @@ -34,9 +34,11 @@ under the License. <packaging>pom</packaging> <modules> + <module>flink-metrics-core</module> <module>flink-metrics-dropwizard</module> <module>flink-metrics-ganglia</module> <module>flink-metrics-graphite</module> + <module>flink-metrics-jmx</module> <module>flink-metrics-statsd</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index 9f779ed..5ad5fe2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java index 9e7dfc1..a560bb6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java @@ -22,7 +22,7 @@ package org.apache.flink.runtime.io.network.api.serialization; import java.io.IOException; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index b218de8..9d4f765 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -25,7 +25,7 @@ import java.nio.ByteOrder; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.DataOutputSerializer; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index f93cdfc..4963698 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.io.network.api.writer; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index fc35bef..6fcd2f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index b316fd9..1cd042c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 90e395c..351181a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import com.google.common.collect.Maps; import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java index 840c805..cc91e83 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java new file mode 100644 index 0000000..9c4da6c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.runtime.metrics.scope.ScopeFormat; +import org.apache.flink.runtime.metrics.scope.ScopeFormats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the + * connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}. + */ +public class MetricRegistry { + static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class); + + private final MetricReporter reporter; + private final ScheduledExecutorService executor; + + private final ScopeFormats scopeFormats; + + private final char delimiter; + + /** + * Creates a new MetricRegistry and starts the configured reporter. + */ + public MetricRegistry(Configuration config) { + // first parse the scope formats, these are needed for all reporters + ScopeFormats scopeFormats; + try { + scopeFormats = createScopeConfig(config); + } + catch (Exception e) { + LOG.warn("Failed to parse scope format, using default scope formats", e); + scopeFormats = new ScopeFormats(); + } + this.scopeFormats = scopeFormats; + + char delim; + try { + delim = config.getString(ConfigConstants.METRICS_SCOPE_DELIMITER, ".").charAt(0); + } catch (Exception e) { + LOG.warn("Failed to parse delimiter, using default delimiter.", e); + delim = '.'; + } + this.delimiter = delim; + + // second, instantiate any custom configured reporters + + final String className = config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null); + if (className == null) { + // by default, don't report anything + LOG.info("No metrics reporter configured, no metrics will be exposed/reported."); + this.reporter = null; + this.executor = null; + } + else { + MetricReporter reporter; + ScheduledExecutorService executor = null; + try { + String configuredPeriod = config.getString(ConfigConstants.METRICS_REPORTER_INTERVAL, null); + TimeUnit timeunit = TimeUnit.SECONDS; + long period = 10; + + if (configuredPeriod != null) { + try { + String[] interval = configuredPeriod.split(" "); + period = Long.parseLong(interval[0]); + timeunit = TimeUnit.valueOf(interval[1]); + } + catch (Exception e) { + LOG.error("Cannot parse report interval from config: " + configuredPeriod + + " - please use values like '10 SECONDS' or '500 MILLISECONDS'. " + + "Using default reporting interval."); + } + } + + MetricConfig reporterConfig = createReporterConfig(config); + + Class<?> reporterClass = Class.forName(className); + reporter = (MetricReporter) reporterClass.newInstance(); + reporter.open(reporterConfig); + + if (reporter instanceof Scheduled) { + executor = Executors.newSingleThreadScheduledExecutor(); + LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name()); + + executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit); + } + } + catch (Throwable t) { + shutdownExecutor(); + LOG.info("Could not instantiate metrics reporter. No metrics will be exposed/reported.", t); + reporter = null; + } + + this.reporter = reporter; + this.executor = executor; + } + } + + public char getDelimiter() { + return this.delimiter; + } + + public MetricReporter getReporter() { + return reporter; + } + + /** + * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}. + */ + public void shutdown() { + if (reporter != null) { + try { + reporter.close(); + } catch (Throwable t) { + LOG.warn("Metrics reporter did not shut down cleanly", t); + } + } + shutdownExecutor(); + } + + private void shutdownExecutor() { + if (executor != null) { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + } + } + } + + public ScopeFormats getScopeFormats() { + return scopeFormats; + } + + // ------------------------------------------------------------------------ + // Metrics (de)registration + // ------------------------------------------------------------------------ + + /** + * Registers a new {@link Metric} with this registry. + * + * @param metric the metric that was added + * @param metricName the name of the metric + * @param group the group that contains the metric + */ + public void register(Metric metric, String metricName, MetricGroup group) { + try { + if (reporter != null) { + reporter.notifyOfAddedMetric(metric, metricName, group); + } + } catch (Exception e) { + LOG.error("Error while registering metric.", e); + } + } + + /** + * Un-registers the given {@link org.apache.flink.metrics.Metric} with this registry. + * + * @param metric the metric that should be removed + * @param metricName the name of the metric + * @param group the group that contains the metric + */ + public void unregister(Metric metric, String metricName, MetricGroup group) { + try { + if (reporter != null) { + reporter.notifyOfRemovedMetric(metric, metricName, group); + } + } catch (Exception e) { + LOG.error("Error while registering metric.", e); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + static MetricConfig createReporterConfig(Configuration config) { + MetricConfig reporterConfig = new MetricConfig(); + + String[] arguments = config.getString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "").split(" "); + if (arguments.length > 1) { + for (int x = 0; x < arguments.length; x += 2) { + reporterConfig.setProperty(arguments[x].replace("--", ""), arguments[x + 1]); + } + } + return reporterConfig; + } + + static ScopeFormats createScopeConfig(Configuration config) { + String jmFormat = config.getString( + ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP); + String jmJobFormat = config.getString( + ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP); + String tmFormat = config.getString( + ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP); + String tmJobFormat = config.getString( + ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP); + String taskFormat = config.getString( + ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP); + String operatorFormat = config.getString( + ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP); + + return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat); + } + + // ------------------------------------------------------------------------ + + /** + * This task is explicitly a static class, so that it does not hold any references to the enclosing + * MetricsRegistry instance. + * + * This is a subtle difference, but very important: With this static class, the enclosing class instance + * may become garbage-collectible, whereas with an anonymous inner class, the timer thread + * (which is a GC root) will hold a reference via the timer task and its enclosing instance pointer. + * Making the MetricsRegistry garbage collectible makes the java.util.Timer garbage collectible, + * which acts as a fail-safe to stop the timer thread and prevents resource leaks. + */ + private static final class ReporterTask extends TimerTask { + + private final Scheduled reporter; + + private ReporterTask(Scheduled reporter) { + this.reporter = reporter; + } + + @Override + public void run() { + try { + reporter.report(); + } catch (Throwable t) { + LOG.warn("Error while reporting metrics", t); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java new file mode 100644 index 0000000..3ac9966 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.metrics.SimpleCounter; + +import org.apache.flink.runtime.metrics.scope.ScopeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Abstract {@link MetricGroup} that contains key functionality for adding metrics and groups. + * + * <p><b>IMPORTANT IMPLEMENTATION NOTE</b> + * + * <p>This class uses locks for adding and removing metrics objects. This is done to + * prevent resource leaks in the presence of concurrently closing a group and adding + * metrics and subgroups. + * Since closing groups recursively closes the subgroups, the lock acquisition order must + * be strictly from parent group to subgroup. If at any point, a subgroup holds its group + * lock and calls a parent method that also acquires the lock, it will create a deadlock + * condition. + * + * <p>An AbstractMetricGroup can be {@link #close() closed}. Upon closing, the group de-register all metrics + * from any metrics reporter and any internal maps. Note that even closed metrics groups + * return Counters, Gauges, etc to the code, to prevent exceptions in the monitored code. + * These metrics simply do not get reported any more, when created on a closed group. + */ +public abstract class AbstractMetricGroup implements MetricGroup { + + /** shared logger */ + private static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class); + + // ------------------------------------------------------------------------ + + /** The registry that this metrics group belongs to */ + protected final MetricRegistry registry; + + /** All metrics that are directly contained in this group */ + private final Map<String, Metric> metrics = new HashMap<>(); + + /** All metric subgroups of this group */ + private final Map<String, AbstractMetricGroup> groups = new HashMap<>(); + + /** The metrics scope represented by this group. + * For example ["host-7", "taskmanager-2", "window_word_count", "my-mapper" ]. */ + private final String[] scopeComponents; + + /** The metrics scope represented by this group, as a concatenated string, lazily computed. + * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */ + private String scopeString; + + /** Flag indicating whether this group has been closed */ + private volatile boolean closed; + + // ------------------------------------------------------------------------ + + public AbstractMetricGroup(MetricRegistry registry, String[] scope) { + this.registry = checkNotNull(registry); + this.scopeComponents = checkNotNull(scope); + } + + /** + * Gets the scope as an array of the scope components, for example + * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]} + * + * @see #getMetricIdentifier(String) + */ + public String[] getScopeComponents() { + return scopeComponents; + } + + /** + * Returns the fully qualified metric name, for example + * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"} + * + * @param metricName metric name + * @return fully qualified metric name + */ + public String getMetricIdentifier(String metricName) { + return getMetricIdentifier(metricName, null); + } + + /** + * Returns the fully qualified metric name, for example + * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"} + * + * @param metricName metric name + * @param filter character filter which is applied to the scope components if not null. + * @return fully qualified metric name + */ + public String getMetricIdentifier(String metricName, CharacterFilter filter) { + if (scopeString == null) { + if (filter != null) { + scopeString = ScopeFormat.concat(filter, registry.getDelimiter(), scopeComponents); + } else { + scopeString = ScopeFormat.concat(registry.getDelimiter(), scopeComponents); + } + } + + if (filter != null) { + return scopeString + registry.getDelimiter() + filter.filterCharacters(metricName); + } else { + return scopeString + registry.getDelimiter() + metricName; + } + } + + // ------------------------------------------------------------------------ + // Closing + // ------------------------------------------------------------------------ + + public void close() { + synchronized (this) { + if (!closed) { + closed = true; + + // close all subgroups + for (AbstractMetricGroup group : groups.values()) { + group.close(); + } + groups.clear(); + + // un-register all directly contained metrics + for (Map.Entry<String, Metric> metric : metrics.entrySet()) { + registry.unregister(metric.getValue(), metric.getKey(), this); + } + metrics.clear(); + } + } + } + + public final boolean isClosed() { + return closed; + } + + // ----------------------------------------------------------------------------------------------------------------- + // Metrics + // ----------------------------------------------------------------------------------------------------------------- + + @Override + public Counter counter(int name) { + return counter(String.valueOf(name)); + } + + @Override + public Counter counter(String name) { + return counter(name, new SimpleCounter()); + } + + @Override + public <C extends Counter> C counter(int name, C counter) { + return counter(String.valueOf(name), counter); + } + + @Override + public <C extends Counter> C counter(String name, C counter) { + addMetric(name, counter); + return counter; + } + + @Override + public <T, G extends Gauge<T>> G gauge(int name, G gauge) { + return gauge(String.valueOf(name), gauge); + } + + @Override + public <T, G extends Gauge<T>> G gauge(String name, G gauge) { + addMetric(name, gauge); + return gauge; + } + + @Override + public <H extends Histogram> H histogram(int name, H histogram) { + return histogram(String.valueOf(name), histogram); + } + + @Override + public <H extends Histogram> H histogram(String name, H histogram) { + addMetric(name, histogram); + return histogram; + } + + /** + * Adds the given metric to the group and registers it at the registry, if the group + * is not yet closed, and if no metric with the same name has been registered before. + * + * @param name the name to register the metric under + * @param metric the metric to register + */ + protected void addMetric(String name, Metric metric) { + // add the metric only if the group is still open + synchronized (this) { + if (!closed) { + // immediately put without a 'contains' check to optimize the common case (no collition) + // collisions are resolved later + Metric prior = metrics.put(name, metric); + + // check for collisions with other metric names + if (prior == null) { + // no other metric with this name yet + + if (groups.containsKey(name)) { + // we warn here, rather than failing, because metrics are tools that should not fail the + // program when used incorrectly + LOG.warn("Name collision: Adding a metric with the same name as a metric subgroup: '" + + name + "'. Metric might not get properly reported. (" + scopeString + ')'); + } + + registry.register(metric, name, this); + } + else { + // we had a collision. put back the original value + metrics.put(name, prior); + + // we warn here, rather than failing, because metrics are tools that should not fail the + // program when used incorrectly + LOG.warn("Name collision: Group already contains a Metric with the name '" + + name + "'. Metric will not be reported. (" + scopeString + ')'); + } + } + } + } + + // ------------------------------------------------------------------------ + // Groups + // ------------------------------------------------------------------------ + + @Override + public MetricGroup addGroup(int name) { + return addGroup(String.valueOf(name)); + } + + @Override + public MetricGroup addGroup(String name) { + synchronized (this) { + if (!closed) { + // adding a group with the same name as a metric creates problems in many reporters/dashboards + // we warn here, rather than failing, because metrics are tools that should not fail the + // program when used incorrectly + if (metrics.containsKey(name)) { + LOG.warn("Name collision: Adding a metric subgroup with the same name as an existing metric: '" + + name + "'. Metric might not get properly reported. (" + scopeString + ')'); + } + + AbstractMetricGroup newGroup = new GenericMetricGroup(registry, this, name); + AbstractMetricGroup prior = groups.put(name, newGroup); + if (prior == null) { + // no prior group with that name + return newGroup; + } else { + // had a prior group with that name, add the prior group back + groups.put(name, prior); + return prior; + } + } + else { + // return a non-registered group that is immediately closed already + GenericMetricGroup closedGroup = new GenericMetricGroup(registry, this, name); + closedGroup.close(); + return closedGroup; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java new file mode 100644 index 0000000..7b4cb05 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.runtime.metrics.MetricRegistry; + +/** + * Abstract {@link org.apache.flink.metrics.MetricGroup} for system components (e.g., + * TaskManager, Job, Task, Operator). + * + * <p>Usually, the scope of metrics is simply the hierarchy of the containing groups. For example + * the Metric {@code "MyMetric"} in group {@code "B"} nested in group {@code "A"} would have a + * fully scoped name of {@code "A.B.MyMetric"}, with {@code "A.B"} being the Metric's scope. + * + * <p>Component groups, however, have configurable scopes. This allow users to include or exclude + * certain identifiers from the scope. The scope for metrics belonging to the "Task" + * group could for example include the task attempt number (more fine grained identification), or + * exclude it (for continuity of the namespace across failure and recovery). + */ +public abstract class ComponentMetricGroup extends AbstractMetricGroup { + + /** + * Creates a new ComponentMetricGroup. + * + * @param registry registry to register new metrics with + * @param scope the scope of the group + */ + public ComponentMetricGroup(MetricRegistry registry, String[] scope) { + super(registry, scope); + } + + /** + * Closes the component group by removing and closing all metrics and subgroups + * (inherited from {@link AbstractMetricGroup}), plus closing and removing all dedicated + * component subgroups. + */ + @Override + public void close() { + synchronized (this) { + if (!isClosed()) { + // remove all metrics and generic subgroups + super.close(); + + // remove and close all subcomponent metrics + for (ComponentMetricGroup group : subComponents()) { + group.close(); + } + } + } + } + + // ------------------------------------------------------------------------ + // Component Metric Group Specifics + // ------------------------------------------------------------------------ + + /** + * Gets all component metric groups that are contained in this component metric group. + * + * @return All component metric groups that are contained in this component metric group. + */ + protected abstract Iterable<? extends ComponentMetricGroup> subComponents(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java new file mode 100644 index 0000000..09b9ea5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.runtime.metrics.MetricRegistry; + +/** + * A simple named {@link org.apache.flink.metrics.MetricGroup} that is used to hold + * subgroups of metrics. + */ +public class GenericMetricGroup extends AbstractMetricGroup { + + public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) { + super(registry, makeScopeComponents(parent, name)); + } + + // ------------------------------------------------------------------------ + + private static String[] makeScopeComponents(AbstractMetricGroup parent, String name) { + if (parent != null) { + String[] parentComponents = parent.getScopeComponents(); + if (parentComponents != null && parentComponents.length > 0) { + String[] parts = new String[parentComponents.length + 1]; + System.arraycopy(parentComponents, 0, parts, 0, parentComponents.length); + parts[parts.length - 1] = name; + return parts; + } + } + return new String[] { name }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java new file mode 100644 index 0000000..8fa6111 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.metrics.Counter; + +/** + * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is + * forwarded to the parent task metric group. + */ +public class IOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { + + private final Counter numBytesOut; + private final Counter numBytesInLocal; + private final Counter numBytesInRemote; + + public IOMetricGroup(TaskMetricGroup parent) { + super(parent); + + this.numBytesOut = counter("numBytesOut"); + this.numBytesInLocal = counter("numBytesInLocal"); + this.numBytesInRemote = counter("numBytesInRemote"); + } + + public Counter getBytesOutCounter() { + return numBytesOut; + } + + public Counter getNumBytesInLocalCounter() { + return numBytesInLocal; + } + + public Counter getNumBytesInRemoteCounter() { + return numBytesInRemote; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java new file mode 100644 index 0000000..807ddd0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.scope.JobManagerJobScopeFormat; + +import javax.annotation.Nullable; +import java.util.Collections; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to + * a specific job, running on the JobManager. + */ +public class JobManagerJobMetricGroup extends JobMetricGroup { + + /** The metrics group that contains this group */ + private final JobManagerMetricGroup parent; + + public JobManagerJobMetricGroup( + MetricRegistry registry, + JobManagerMetricGroup parent, + JobID jobId, + @Nullable String jobName) { + + this(registry, checkNotNull(parent), registry.getScopeFormats().getJobManagerJobFormat(), jobId, jobName); + } + + public JobManagerJobMetricGroup( + MetricRegistry registry, + JobManagerMetricGroup parent, + JobManagerJobScopeFormat scopeFormat, + JobID jobId, + @Nullable String jobName) { + + super(registry, jobId, jobName, scopeFormat.formatScope(parent, jobId, jobName)); + + this.parent = checkNotNull(parent); + } + + public final JobManagerMetricGroup parent() { + return parent; + } + + // ------------------------------------------------------------------------ + // Component Metric Group Specifics + // ------------------------------------------------------------------------ + + @Override + protected Iterable<? extends ComponentMetricGroup> subComponents() { + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java new file mode 100644 index 0000000..4d3dfb7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.scope.JobManagerScopeFormat; + +import java.util.HashMap; +import java.util.Map; + +/** + * Special {@link org.apache.flink.metrics.MetricGroup} representing a JobManager. + * + * <p>Contains extra logic for adding jobs with tasks, and removing jobs when they do + * not contain tasks any more + */ +public class JobManagerMetricGroup extends ComponentMetricGroup { + + private final Map<JobID, JobManagerJobMetricGroup> jobs = new HashMap<>(); + + private final String hostname; + + public JobManagerMetricGroup(MetricRegistry registry, String hostname) { + this(registry, registry.getScopeFormats().getJobManagerFormat(), hostname); + } + + public JobManagerMetricGroup( + MetricRegistry registry, + JobManagerScopeFormat scopeFormat, + String hostname) { + + super(registry, scopeFormat.formatScope(hostname)); + this.hostname = hostname; + } + + public String hostname() { + return hostname; + } + + // ------------------------------------------------------------------------ + // job groups + // ------------------------------------------------------------------------ + + public JobManagerJobMetricGroup addJob(JobGraph job) { + JobID jobId = job.getJobID(); + String jobName = job.getName(); + // get or create a jobs metric group + JobManagerJobMetricGroup currentJobGroup; + synchronized (this) { + if (!isClosed()) { + currentJobGroup = jobs.get(jobId); + + if (currentJobGroup == null || currentJobGroup.isClosed()) { + currentJobGroup = new JobManagerJobMetricGroup(registry, this, jobId, jobName); + jobs.put(jobId, currentJobGroup); + } + return currentJobGroup; + } else { + return null; + } + } + } + + public void removeJob(JobID jobId) { + if (jobId == null) { + return; + } + + synchronized (this) { + JobManagerJobMetricGroup containedGroup = jobs.remove(jobId); + if (containedGroup != null) { + containedGroup.close(); + } + } + } + + public int numRegisteredJobMetricGroups() { + return jobs.size(); + } + + @Override + protected Iterable<? extends ComponentMetricGroup> subComponents() { + return jobs.values(); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java new file mode 100644 index 0000000..0c1c55c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.metrics.MetricRegistry; + +import javax.annotation.Nullable; + +/** + * Special abstract {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to + * a specific job. + */ +@Internal +public abstract class JobMetricGroup extends ComponentMetricGroup { + + /** The ID of the job represented by this metrics group */ + protected final JobID jobId; + + /** The name of the job represented by this metrics group */ + @Nullable + protected final String jobName; + + // ------------------------------------------------------------------------ + + protected JobMetricGroup( + MetricRegistry registry, + JobID jobId, + @Nullable String jobName, + String[] scope) { + super(registry, scope); + + this.jobId = jobId; + this.jobName = jobName; + } + + public JobID jobId() { + return jobId; + } + + @Nullable + public String jobName() { + return jobName; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java new file mode 100644 index 0000000..caee821 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.scope.OperatorScopeFormat; + +import java.util.Collections; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Special {@link org.apache.flink.metrics.MetricGroup} representing an Operator. + */ +public class OperatorMetricGroup extends ComponentMetricGroup { + + /** The task metric group that contains this operator metric groups */ + private final TaskMetricGroup parent; + + public OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup parent, String operatorName) { + this(registry, parent, registry.getScopeFormats().getOperatorFormat(), operatorName); + } + + public OperatorMetricGroup( + MetricRegistry registry, + TaskMetricGroup parent, + OperatorScopeFormat scopeFormat, + String operatorName) { + + super(registry, scopeFormat.formatScope(parent, operatorName)); + this.parent = checkNotNull(parent); + } + + // ------------------------------------------------------------------------ + + public final TaskMetricGroup parent() { + return parent; + } + + // ------------------------------------------------------------------------ + // Component Metric Group Specifics + // ------------------------------------------------------------------------ + + @Override + protected Iterable<? extends ComponentMetricGroup> subComponents() { + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java new file mode 100644 index 0000000..6dd38a2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.MetricGroup; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Metric group which forwards all registration calls to its parent metric group. + * + * @param <P> Type of the parent metric group + */ +public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup { + + private final P parentMetricGroup; + + public ProxyMetricGroup(P parentMetricGroup) { + this.parentMetricGroup = checkNotNull(parentMetricGroup); + } + + @Override + public final Counter counter(int name) { + return parentMetricGroup.counter(name); + } + + @Override + public final Counter counter(String name) { + return parentMetricGroup.counter(name); + } + + @Override + public final <C extends Counter> C counter(int name, C counter) { + return parentMetricGroup.counter(name, counter); + } + + @Override + public final <C extends Counter> C counter(String name, C counter) { + return parentMetricGroup.counter(name, counter); + } + + @Override + public final <T, G extends Gauge<T>> G gauge(int name, G gauge) { + return parentMetricGroup.gauge(name, gauge); + } + + @Override + public final <T, G extends Gauge<T>> G gauge(String name, G gauge) { + return parentMetricGroup.gauge(name, gauge); + } + + @Override + public final <H extends Histogram> H histogram(String name, H histogram) { + return parentMetricGroup.histogram(name, histogram); + } + + @Override + public final <H extends Histogram> H histogram(int name, H histogram) { + return parentMetricGroup.histogram(name, histogram); + } + + @Override + public final MetricGroup addGroup(int name) { + return parentMetricGroup.addGroup(name); + } + + @Override + public final MetricGroup addGroup(String name) { + return parentMetricGroup.addGroup(name); + } + + @Override + public String[] getScopeComponents() { + return parentMetricGroup.getScopeComponents(); + } + + @Override + public String getMetricIdentifier(String metricName) { + return parentMetricGroup.getMetricIdentifier(metricName); + } + + @Override + public String getMetricIdentifier(String metricName, CharacterFilter filter) { + return parentMetricGroup.getMetricIdentifier(metricName, filter); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java new file mode 100644 index 0000000..8219ef0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.scope.TaskManagerJobScopeFormat; +import org.apache.flink.util.AbstractID; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to + * a specific job, running on the TaskManager. + * + * <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}). + */ +public class TaskManagerJobMetricGroup extends JobMetricGroup { + + /** The metrics group that contains this group */ + private final TaskManagerMetricGroup parent; + + /** Map from execution attempt ID (task identifier) to task metrics */ + private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>(); + + // ------------------------------------------------------------------------ + + public TaskManagerJobMetricGroup( + MetricRegistry registry, + TaskManagerMetricGroup parent, + JobID jobId, + @Nullable String jobName) { + + this(registry, checkNotNull(parent), registry.getScopeFormats().getTaskManagerJobFormat(), jobId, jobName); + } + + public TaskManagerJobMetricGroup( + MetricRegistry registry, + TaskManagerMetricGroup parent, + TaskManagerJobScopeFormat scopeFormat, + JobID jobId, + @Nullable String jobName) { + + super(registry, jobId, jobName, scopeFormat.formatScope(parent, jobId, jobName)); + + this.parent = checkNotNull(parent); + } + + public final TaskManagerMetricGroup parent() { + return parent; + } + + // ------------------------------------------------------------------------ + // adding / removing tasks + // ------------------------------------------------------------------------ + + public TaskMetricGroup addTask(TaskDeploymentDescriptor tdd) { + AbstractID vertexId = tdd.getVertexID(); + AbstractID executionId = tdd.getExecutionId(); + String taskName = tdd.getTaskName(); + int subtaskIndex = tdd.getIndexInSubtaskGroup(); + int attemptNumber = tdd.getAttemptNumber(); + + checkNotNull(executionId); + + synchronized (this) { + if (!isClosed()) { + TaskMetricGroup task = new TaskMetricGroup(registry, this, + vertexId, executionId, taskName, subtaskIndex, attemptNumber); + tasks.put(executionId, task); + return task; + } else { + return null; + } + } + } + + public void removeTaskMetricGroup(AbstractID executionId) { + checkNotNull(executionId); + + boolean removeFromParent = false; + synchronized (this) { + if (!isClosed() && tasks.remove(executionId) != null && tasks.isEmpty()) { + // this call removed the last task. close this group. + removeFromParent = true; + close(); + } + } + + // IMPORTANT: removing from the parent must not happen while holding the this group's lock, + // because it would violate the "first parent then subgroup" lock acquisition order + if (removeFromParent) { + parent.removeJobMetricsGroup(jobId, this); + } + } + + // ------------------------------------------------------------------------ + // Component Metric Group Specifics + // ------------------------------------------------------------------------ + + @Override + protected Iterable<? extends ComponentMetricGroup> subComponents() { + return tasks.values(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java new file mode 100644 index 0000000..f58a95f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat; + +import java.util.HashMap; +import java.util.Map; + +/** + * Special {@link org.apache.flink.metrics.MetricGroup} representing a TaskManager. + * + * <p>Contains extra logic for adding jobs with tasks, and removing jobs when they do + * not contain tasks any more + */ +public class TaskManagerMetricGroup extends ComponentMetricGroup { + + private final Map<JobID, TaskManagerJobMetricGroup> jobs = new HashMap<>(); + + private final String hostname; + + private final String taskManagerId; + + + public TaskManagerMetricGroup(MetricRegistry registry, String hostname, String taskManagerId) { + this(registry, registry.getScopeFormats().getTaskManagerFormat(), hostname, taskManagerId); + } + + public TaskManagerMetricGroup( + MetricRegistry registry, + TaskManagerScopeFormat scopeFormat, + String hostname, String taskManagerId) { + + super(registry, scopeFormat.formatScope(hostname, taskManagerId)); + this.hostname = hostname; + this.taskManagerId = taskManagerId; + } + + public String hostname() { + return hostname; + } + + public String taskManagerId() { + return taskManagerId; + } + + // ------------------------------------------------------------------------ + // job groups + // ------------------------------------------------------------------------ + + public TaskMetricGroup addTaskForJob(TaskDeploymentDescriptor tdd) { + JobID jobId = tdd.getJobID(); + String jobName = tdd.getJobName().length() == 0 + ? tdd.getJobID().toString() + : tdd.getJobName(); + + // we cannot strictly lock both our map modification and the job group modification + // because it might lead to a deadlock + while (true) { + // get or create a jobs metric group + TaskManagerJobMetricGroup currentJobGroup; + synchronized (this) { + currentJobGroup = jobs.get(jobId); + + if (currentJobGroup == null || currentJobGroup.isClosed()) { + currentJobGroup = new TaskManagerJobMetricGroup(registry, this, jobId, jobName); + jobs.put(jobId, currentJobGroup); + } + } + + // try to add another task. this may fail if we found a pre-existing job metrics + // group and it is closed concurrently + TaskMetricGroup taskGroup = currentJobGroup.addTask(tdd); + + if (taskGroup != null) { + // successfully added the next task + return taskGroup; + } + + // else fall through the loop + } + } + + public void removeJobMetricsGroup(JobID jobId, TaskManagerJobMetricGroup group) { + if (jobId == null || group == null || !group.isClosed()) { + return; + } + + synchronized (this) { + // optimistically remove the currently contained group, and check later if it was correct + TaskManagerJobMetricGroup containedGroup = jobs.remove(jobId); + + // check if another group was actually contained, and restore that one + if (containedGroup != null && containedGroup != group) { + jobs.put(jobId, containedGroup); + } + } + } + + public int numRegisteredJobMetricGroups() { + return jobs.size(); + } + + // ------------------------------------------------------------------------ + // Component Metric Group Specifics + // ------------------------------------------------------------------------ + + @Override + protected Iterable<? extends ComponentMetricGroup> subComponents() { + return jobs.values(); + } +} +