http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
new file mode 100644
index 0000000..efb38b2
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.jmx;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import java.lang.management.ManagementFactory;
+
+import static org.junit.Assert.assertEquals;
+
+public class JMXReporterTest extends TestLogger {
+
+       @Test
+       public void testReplaceInvalidChars() {
+               assertEquals("", JMXReporter.replaceInvalidChars(""));
+               assertEquals("abc", JMXReporter.replaceInvalidChars("abc"));
+               assertEquals("abc", JMXReporter.replaceInvalidChars("abc\""));
+               assertEquals("abc", JMXReporter.replaceInvalidChars("\"abc"));
+               assertEquals("abc", JMXReporter.replaceInvalidChars("\"abc\""));
+               assertEquals("abc", 
JMXReporter.replaceInvalidChars("\"a\"b\"c\""));
+               assertEquals("", JMXReporter.replaceInvalidChars("\"\"\"\""));
+               assertEquals("____", JMXReporter.replaceInvalidChars("    "));
+               assertEquals("ab_-(c)-", JMXReporter.replaceInvalidChars("\"ab 
;(c)'"));
+               assertEquals("a_b_c", JMXReporter.replaceInvalidChars("a b c"));
+               assertEquals("a_b_c_", JMXReporter.replaceInvalidChars("a b c 
"));
+               assertEquals("a-b-c-", 
JMXReporter.replaceInvalidChars("a;b'c*"));
+               assertEquals("a------b------c", 
JMXReporter.replaceInvalidChars("a,=;:?'b,=;:?'c"));
+       }
+
+       /**
+        * Verifies that the JMXReporter properly generates the JMX name.
+        */
+       @Test
+       public void testGenerateName() {
+               String[] scope = {"value0", "value1", "\"value2 (test),=;:?'"};
+               String jmxName = JMXReporter.generateJmxName("TestMetric", 
scope);
+
+               
assertEquals("org.apache.flink.metrics:key0=value0,key1=value1,key2=value2_(test)------,name=TestMetric",
 jmxName);
+       }
+
+       /**
+        * Verifies that multiple JMXReporters can be started on the same 
machine and register metrics at the MBeanServer.
+        *
+        * @throws Exception if the attribute/mbean could not be found or the 
test is broken
+        */
+       @Test
+       public void testPortConflictHandling() throws Exception {
+               Configuration cfg = new Configuration();
+               MetricRegistry reg = new MetricRegistry(cfg);
+
+               TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, 
"host", "tm");
+
+               JMXReporter rep1 = new JMXReporter();
+               JMXReporter rep2 = new JMXReporter();
+
+               MetricConfig cfg1 = new MetricConfig();
+               cfg1.setProperty("port", "9020-9035");
+
+               rep1.open(cfg1);
+               rep2.open(cfg1);
+
+               rep1.notifyOfAddedMetric(new Gauge<Integer>() {
+                       @Override
+                       public Integer getValue() {
+                               return 1;
+                       }
+               }, "rep1", new TaskManagerMetricGroup(reg, "host", "tm"));
+
+               rep2.notifyOfAddedMetric(new Gauge<Integer>() {
+                       @Override
+                       public Integer getValue() {
+                               return 2;
+                       }
+               }, "rep2", new TaskManagerMetricGroup(reg, "host", "tm"));
+
+               MBeanServer mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
+
+               ObjectName objectName1 = new 
ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents()));
+               ObjectName objectName2 = new 
ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents()));
+
+               assertEquals(1, mBeanServer.getAttribute(objectName1, "Value"));
+               assertEquals(2, mBeanServer.getAttribute(objectName2, "Value"));
+
+               rep1.close();
+               rep2.close();
+               reg.shutdown();
+       }
+
+       /**
+        * Verifies that we can connect to multiple JMXReporters running on the 
same machine.
+        *
+        * @throws Exception
+        */
+       @Test
+       public void testJMXAvailability() throws Exception {
+               Configuration cfg = new Configuration();
+               MetricRegistry reg = new MetricRegistry(cfg);
+
+               TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, 
"host", "tm");
+
+               JMXReporter rep1 = new JMXReporter();
+               JMXReporter rep2 = new JMXReporter();
+
+               MetricConfig cfg1 = new MetricConfig();
+               cfg1.setProperty("port", "9040-9055");
+               rep1.open(cfg1);
+               rep2.open(cfg1);
+
+               rep1.notifyOfAddedMetric(new Gauge<Integer>() {
+                       @Override
+                       public Integer getValue() {
+                               return 1;
+                       }
+               }, "rep1", new TaskManagerMetricGroup(reg, "host", "tm"));
+
+               rep2.notifyOfAddedMetric(new Gauge<Integer>() {
+                       @Override
+                       public Integer getValue() {
+                               return 2;
+                       }
+               }, "rep2", new TaskManagerMetricGroup(reg, "host", "tm"));
+
+               ObjectName objectName1 = new 
ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents()));
+               ObjectName objectName2 = new 
ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents()));
+
+               JMXServiceURL url1 = new 
JMXServiceURL("service:jmx:rmi://localhost:" + rep1.getPort() + 
"/jndi/rmi://localhost:" + rep1.getPort() + "/jmxrmi");
+               JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1);
+               MBeanServerConnection mCon1 = 
jmxCon1.getMBeanServerConnection();
+
+               assertEquals(1, mCon1.getAttribute(objectName1, "Value"));
+               assertEquals(2, mCon1.getAttribute(objectName2, "Value"));
+
+               url1 = null;
+               jmxCon1.close();
+               jmxCon1 = null;
+               mCon1 = null;
+
+               JMXServiceURL url2 = new 
JMXServiceURL("service:jmx:rmi://localhost:" + rep2.getPort() + 
"/jndi/rmi://localhost:" + rep2.getPort() + "/jmxrmi");
+               JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2);
+               MBeanServerConnection mCon2 = 
jmxCon2.getMBeanServerConnection();
+
+               assertEquals(1, mCon2.getAttribute(objectName1, "Value"));
+               assertEquals(2, mCon2.getAttribute(objectName2, "Value"));
+
+               url2 = null;
+               jmxCon2.close();
+               jmxCon2 = null;
+               mCon2 = null;
+
+               rep1.close();
+               rep2.close();
+               reg.shutdown();
+       }
+
+       /**
+        * Tests that histograms are properly reported via the JMXReporter.
+        */
+       @Test
+       public void testHistogramReporting() throws Exception {
+               MetricRegistry registry = null;
+               String histogramName = "histogram";
+
+               try {
+                       Configuration config = new Configuration();
+                       
config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
JMXReporter.class.getName());
+
+                       registry = new MetricRegistry(config);
+
+                       TaskManagerMetricGroup metricGroup = new 
TaskManagerMetricGroup(registry, "localhost", "tmId");
+
+                       TestingHistogram histogram = new TestingHistogram();
+
+                       metricGroup.histogram(histogramName, histogram);
+
+                       MBeanServer mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
+
+                       ObjectName objectName = new 
ObjectName(JMXReporter.generateJmxName(histogramName, 
metricGroup.getScopeComponents()));
+
+                       MBeanInfo info = mBeanServer.getMBeanInfo(objectName);
+
+                       MBeanAttributeInfo[] attributeInfos = 
info.getAttributes();
+
+                       assertEquals(11, attributeInfos.length);
+
+                       assertEquals(histogram.getCount(), 
mBeanServer.getAttribute(objectName, "Count"));
+                       assertEquals(histogram.getStatistics().getMean(), 
mBeanServer.getAttribute(objectName, "Mean"));
+                       assertEquals(histogram.getStatistics().getStdDev(), 
mBeanServer.getAttribute(objectName, "StdDev"));
+                       assertEquals(histogram.getStatistics().getMax(), 
mBeanServer.getAttribute(objectName, "Max"));
+                       assertEquals(histogram.getStatistics().getMin(), 
mBeanServer.getAttribute(objectName, "Min"));
+                       
assertEquals(histogram.getStatistics().getQuantile(0.5), 
mBeanServer.getAttribute(objectName, "Median"));
+                       
assertEquals(histogram.getStatistics().getQuantile(0.75), 
mBeanServer.getAttribute(objectName, "75thPercentile"));
+                       
assertEquals(histogram.getStatistics().getQuantile(0.95), 
mBeanServer.getAttribute(objectName, "95thPercentile"));
+                       
assertEquals(histogram.getStatistics().getQuantile(0.98), 
mBeanServer.getAttribute(objectName, "98thPercentile"));
+                       
assertEquals(histogram.getStatistics().getQuantile(0.99), 
mBeanServer.getAttribute(objectName, "99thPercentile"));
+                       
assertEquals(histogram.getStatistics().getQuantile(0.999), 
mBeanServer.getAttribute(objectName, "999thPercentile"));
+
+               } finally {
+                       if (registry != null) {
+                               registry.shutdown();
+                       }
+               }
+       }
+
+       static class TestingHistogram implements Histogram {
+
+               @Override
+               public void update(long value) {
+
+               }
+
+               @Override
+               public long getCount() {
+                       return 1;
+               }
+
+               @Override
+               public HistogramStatistics getStatistics() {
+                       return new HistogramStatistics() {
+                               @Override
+                               public double getQuantile(double quantile) {
+                                       return quantile;
+                               }
+
+                               @Override
+                               public long[] getValues() {
+                                       return new long[0];
+                               }
+
+                               @Override
+                               public int size() {
+                                       return 3;
+                               }
+
+                               @Override
+                               public double getMean() {
+                                       return 4;
+                               }
+
+                               @Override
+                               public double getStdDev() {
+                                       return 5;
+                               }
+
+                               @Override
+                               public long getMax() {
+                                       return 6;
+                               }
+
+                               @Override
+                               public long getMin() {
+                                       return 7;
+                               }
+                       };
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
new file mode 100644
index 0000000..8b0f672
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.metrics.jmx.JMXReporter;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class JMXJobManagerMetricTest {
+       /**
+        * Tests that metrics registered on the JobManager are actually 
accessible via JMX.
+        *
+        * @throws Exception
+        */
+       @Test
+       public void testJobManagerJMXMetricAccess() throws Exception {
+               Deadline deadline = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
+               Configuration flinkConfiguration = new Configuration();
+
+               
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
JMXReporter.class.getName());
+               
flinkConfiguration.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, 
"jobmanager.<job_name>");
+               
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, 
"--port 9060-9075");
+
+               TestingCluster flink = new TestingCluster(flinkConfiguration);
+
+               try {
+                       flink.start();
+
+                       JobVertex sourceJobVertex = new JobVertex("Source");
+                       
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
+
+                       JobGraph jobGraph = new JobGraph("TestingJob", 
sourceJobVertex);
+                       jobGraph.setSnapshotSettings(new 
JobSnapshottingSettings(
+                               Collections.<JobVertexID>emptyList(),
+                               Collections.<JobVertexID>emptyList(),
+                               Collections.<JobVertexID>emptyList(),
+                               500, 500, 50, 5));
+
+                       flink.waitForActorsToBeAlive();
+
+                       flink.submitJobDetached(jobGraph);
+
+                       Future<Object> jobRunning = 
flink.getLeaderGateway(deadline.timeLeft())
+                               .ask(new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), 
deadline.timeLeft());
+                       Await.ready(jobRunning, deadline.timeLeft());
+
+                       MBeanServer mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
+                       ObjectName objectName1 = new 
ObjectName("org.apache.flink.metrics:key0=jobmanager,key1=TestingJob,name=lastCheckpointSize");
+                       assertEquals(-1L, mBeanServer.getAttribute(objectName1, 
"Value"));
+
+                       Future<Object> jobFinished = 
flink.getLeaderGateway(deadline.timeLeft())
+                               .ask(new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), 
deadline.timeLeft());
+
+                       BlockingInvokable.unblock();
+
+                       // wait til the job has finished
+                       Await.ready(jobFinished, deadline.timeLeft());
+               } finally {
+                       flink.stop();
+               }
+       }
+
+       public static class BlockingInvokable extends AbstractInvokable {
+               private static boolean blocking = true;
+               private static final Object lock = new Object();
+
+               @Override
+               public void invoke() throws Exception {
+                       while (blocking) {
+                               synchronized (lock) {
+                                       lock.wait();
+                               }
+                       }
+               }
+
+               public static void unblock() {
+                       blocking = false;
+
+                       synchronized (lock) {
+                               lock.notifyAll();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-jmx/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/resources/log4j-test.properties 
b/flink-metrics/flink-metrics-jmx/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ b/flink-metrics/flink-metrics-jmx/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-jmx/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/resources/logback-test.xml 
b/flink-metrics/flink-metrics-jmx/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..1c4ea08
--- /dev/null
+++ b/flink-metrics/flink-metrics-jmx/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    
+    <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" 
level="OFF"/>
+    <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.configuration.GlobalConfiguration" 
level="OFF"/>
+    <logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-statsd/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/pom.xml 
b/flink-metrics/flink-metrics-statsd/pom.xml
index 4b43e11..85d1dc4 100644
--- a/flink-metrics/flink-metrics-statsd/pom.xml
+++ b/flink-metrics/flink-metrics-statsd/pom.xml
@@ -42,7 +42,7 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
+                       <artifactId>flink-metrics-core</artifactId>
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>
@@ -51,6 +51,13 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-test-utils-junit</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 
b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
index e0c6dc7..354eff1 100644
--- 
a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ 
b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -19,11 +19,11 @@
 package org.apache.flink.metrics.statsd;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.AbstractReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
 
@@ -61,7 +61,7 @@ public class StatsDReporter extends AbstractReporter 
implements Scheduled {
        private InetSocketAddress address;
 
        @Override
-       public void open(Configuration config) {
+       public void open(MetricConfig config) {
                String host = config.getString(ARG_HOST, null);
                int port = config.getInteger(ARG_PORT, -1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 37a3477..d7ae04f 100644
--- 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -24,12 +24,13 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
-import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup;
-import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -191,7 +192,7 @@ public class StatsDReporterTest extends TestLogger {
         */
        public static class TestingStatsDReporter extends StatsDReporter {
                @Override
-               public void open(Configuration configuration) {
+               public void open(MetricConfig configuration) {
                        // disable the socket creation
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
index 542f49c..7c9aaee 100644
--- a/flink-metrics/pom.xml
+++ b/flink-metrics/pom.xml
@@ -34,9 +34,11 @@ under the License.
        <packaging>pom</packaging>
 
        <modules>
+               <module>flink-metrics-core</module>
                <module>flink-metrics-dropwizard</module>
                <module>flink-metrics-ganglia</module>
                <module>flink-metrics-graphite</module>
+               <module>flink-metrics-jmx</module>
                <module>flink-metrics-statsd</module>
        </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 9f779ed..5ad5fe2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index 9e7dfc1..a560bb6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index b218de8..9d4f765 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -25,7 +25,7 @@ import java.nio.ByteOrder;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataOutputSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index f93cdfc..4963698 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index fc35bef..6fcd2f9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index b316fd9..1cd042c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 90e395c..351181a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -20,7 +20,7 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Maps;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 840c805..cc91e83 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
new file mode 100644
index 0000000..9c4da6c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It 
serves as the
+ * connection between {@link MetricGroup MetricGroups} and {@link 
MetricReporter MetricReporters}.
+ */
+public class MetricRegistry {
+       static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
+
+       private final MetricReporter reporter;
+       private final ScheduledExecutorService executor;
+
+       private final ScopeFormats scopeFormats;
+
+       private final char delimiter;
+
+       /**
+        * Creates a new MetricRegistry and starts the configured reporter.
+        */
+       public MetricRegistry(Configuration config) {
+               // first parse the scope formats, these are needed for all 
reporters
+               ScopeFormats scopeFormats;
+               try {
+                       scopeFormats = createScopeConfig(config);
+               }
+               catch (Exception e) {
+                       LOG.warn("Failed to parse scope format, using default 
scope formats", e);
+                       scopeFormats = new ScopeFormats();
+               }
+               this.scopeFormats = scopeFormats;
+
+               char delim;
+               try {
+                       delim = 
config.getString(ConfigConstants.METRICS_SCOPE_DELIMITER, ".").charAt(0);
+               } catch (Exception e) {
+                       LOG.warn("Failed to parse delimiter, using default 
delimiter.", e);
+                       delim = '.';
+               }
+               this.delimiter = delim;
+
+               // second, instantiate any custom configured reporters
+
+               final String className = 
config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null);
+               if (className == null) {
+                       // by default, don't report anything
+                       LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
+                       this.reporter = null;
+                       this.executor = null;
+               }
+               else {
+                       MetricReporter reporter;
+                       ScheduledExecutorService executor = null;
+                       try {
+                               String configuredPeriod = 
config.getString(ConfigConstants.METRICS_REPORTER_INTERVAL, null);
+                               TimeUnit timeunit = TimeUnit.SECONDS;
+                               long period = 10;
+
+                               if (configuredPeriod != null) {
+                                       try {
+                                               String[] interval = 
configuredPeriod.split(" ");
+                                               period = 
Long.parseLong(interval[0]);
+                                               timeunit = 
TimeUnit.valueOf(interval[1]);
+                                       }
+                                       catch (Exception e) {
+                                               LOG.error("Cannot parse report 
interval from config: " + configuredPeriod +
+                                                       " - please use values 
like '10 SECONDS' or '500 MILLISECONDS'. " +
+                                                       "Using default 
reporting interval.");
+                                       }
+                               }
+
+                               MetricConfig reporterConfig = 
createReporterConfig(config);
+
+                               Class<?> reporterClass = 
Class.forName(className);
+                               reporter = (MetricReporter) 
reporterClass.newInstance();
+                               reporter.open(reporterConfig);
+
+                               if (reporter instanceof Scheduled) {
+                                       executor = 
Executors.newSingleThreadScheduledExecutor();
+                                       LOG.info("Periodically reporting 
metrics in intervals of {} {}", period, timeunit.name());
+
+                                       executor.scheduleWithFixedDelay(new 
ReporterTask((Scheduled) reporter), period, period, timeunit);
+                               }
+                       }
+                       catch (Throwable t) {
+                               shutdownExecutor();
+                               LOG.info("Could not instantiate metrics 
reporter. No metrics will be exposed/reported.", t);
+                               reporter = null;
+                       }
+
+                       this.reporter = reporter;
+                       this.executor = executor;
+               }
+       }
+
+       public char getDelimiter() {
+               return this.delimiter;
+       }
+
+       public MetricReporter getReporter() {
+               return reporter;
+       }
+
+       /**
+        * Shuts down this registry and the associated {@link 
org.apache.flink.metrics.reporter.MetricReporter}.
+        */
+       public void shutdown() {
+               if (reporter != null) {
+                       try {
+                               reporter.close();
+                       } catch (Throwable t) {
+                               LOG.warn("Metrics reporter did not shut down 
cleanly", t);
+                       }
+               }
+               shutdownExecutor();
+       }
+
+       private void shutdownExecutor() {
+               if (executor != null) {
+                       executor.shutdown();
+
+                       try {
+                               if (!executor.awaitTermination(1, 
TimeUnit.SECONDS)) {
+                                       executor.shutdownNow();
+                               }
+                       } catch (InterruptedException e) {
+                               executor.shutdownNow();
+                       }
+               }
+       }
+
+       public ScopeFormats getScopeFormats() {
+               return scopeFormats;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Metrics (de)registration
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Registers a new {@link Metric} with this registry.
+        *
+        * @param metric      the metric that was added
+        * @param metricName  the name of the metric
+        * @param group       the group that contains the metric
+        */
+       public void register(Metric metric, String metricName, MetricGroup 
group) {
+               try {
+                       if (reporter != null) {
+                               reporter.notifyOfAddedMetric(metric, 
metricName, group);
+                       }
+               } catch (Exception e) {
+                       LOG.error("Error while registering metric.", e);
+               }
+       }
+
+       /**
+        * Un-registers the given {@link org.apache.flink.metrics.Metric} with 
this registry.
+        *
+        * @param metric      the metric that should be removed
+        * @param metricName  the name of the metric
+        * @param group       the group that contains the metric
+        */
+       public void unregister(Metric metric, String metricName, MetricGroup 
group) {
+               try {
+                       if (reporter != null) {
+                               reporter.notifyOfRemovedMetric(metric, 
metricName, group);
+                       }
+               } catch (Exception e) {
+                       LOG.error("Error while registering metric.", e);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+       static MetricConfig createReporterConfig(Configuration config) {
+               MetricConfig reporterConfig = new MetricConfig();
+
+               String[] arguments = 
config.getString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "").split(" ");
+               if (arguments.length > 1) {
+                       for (int x = 0; x < arguments.length; x += 2) {
+                               
reporterConfig.setProperty(arguments[x].replace("--", ""), arguments[x + 1]);
+                       }
+               }
+               return reporterConfig;
+       }
+
+       static ScopeFormats createScopeConfig(Configuration config) {
+               String jmFormat = config.getString(
+                       ConfigConstants.METRICS_SCOPE_NAMING_JM, 
ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
+               String jmJobFormat = config.getString(
+                       ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, 
ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
+               String tmFormat = config.getString(
+                       ConfigConstants.METRICS_SCOPE_NAMING_TM, 
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
+               String tmJobFormat = config.getString(
+                       ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, 
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
+               String taskFormat = config.getString(
+                       ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
+               String operatorFormat = config.getString(
+                       ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, 
ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
+
+               return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, 
tmJobFormat, taskFormat, operatorFormat);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This task is explicitly a static class, so that it does not hold any 
references to the enclosing
+        * MetricsRegistry instance.
+        *
+        * This is a subtle difference, but very important: With this static 
class, the enclosing class instance
+        * may become garbage-collectible, whereas with an anonymous inner 
class, the timer thread
+        * (which is a GC root) will hold a reference via the timer task and 
its enclosing instance pointer.
+        * Making the MetricsRegistry garbage collectible makes the 
java.util.Timer garbage collectible,
+        * which acts as a fail-safe to stop the timer thread and prevents 
resource leaks.
+        */
+       private static final class ReporterTask extends TimerTask {
+
+               private final Scheduled reporter;
+
+               private ReporterTask(Scheduled reporter) {
+                       this.reporter = reporter;
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               reporter.report();
+                       } catch (Throwable t) {
+                               LOG.warn("Error while reporting metrics", t);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
new file mode 100644
index 0000000..3ac9966
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.metrics.SimpleCounter;
+
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract {@link MetricGroup} that contains key functionality for adding 
metrics and groups.
+ * 
+ * <p><b>IMPORTANT IMPLEMENTATION NOTE</b>
+ * 
+ * <p>This class uses locks for adding and removing metrics objects. This is 
done to
+ * prevent resource leaks in the presence of concurrently closing a group and 
adding
+ * metrics and subgroups.
+ * Since closing groups recursively closes the subgroups, the lock acquisition 
order must
+ * be strictly from parent group to subgroup. If at any point, a subgroup 
holds its group
+ * lock and calls a parent method that also acquires the lock, it will create 
a deadlock
+ * condition.
+ *
+ * <p>An AbstractMetricGroup can be {@link #close() closed}. Upon closing, the 
group de-register all metrics
+ * from any metrics reporter and any internal maps. Note that even closed 
metrics groups
+ * return Counters, Gauges, etc to the code, to prevent exceptions in the 
monitored code.
+ * These metrics simply do not get reported any more, when created on a closed 
group.
+ */
+public abstract class AbstractMetricGroup implements MetricGroup {
+
+       /** shared logger */
+       private static final Logger LOG = 
LoggerFactory.getLogger(MetricGroup.class);
+
+       // 
------------------------------------------------------------------------
+
+       /** The registry that this metrics group belongs to */
+       protected final MetricRegistry registry;
+
+       /** All metrics that are directly contained in this group */
+       private final Map<String, Metric> metrics = new HashMap<>();
+
+       /** All metric subgroups of this group */
+       private final Map<String, AbstractMetricGroup> groups = new HashMap<>();
+
+       /** The metrics scope represented by this group.
+        *  For example ["host-7", "taskmanager-2", "window_word_count", 
"my-mapper" ]. */
+       private final String[] scopeComponents;
+
+       /** The metrics scope represented by this group, as a concatenated 
string, lazily computed.
+        * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
+       private String scopeString;
+
+       /** Flag indicating whether this group has been closed */
+       private volatile boolean closed;
+
+       // 
------------------------------------------------------------------------
+
+       public AbstractMetricGroup(MetricRegistry registry, String[] scope) {
+               this.registry = checkNotNull(registry);
+               this.scopeComponents = checkNotNull(scope);
+       }
+
+       /**
+        * Gets the scope as an array of the scope components, for example
+        * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}
+        * 
+        * @see #getMetricIdentifier(String) 
+        */
+       public String[] getScopeComponents() {
+               return scopeComponents;
+       }
+
+       /**
+        * Returns the fully qualified metric name, for example
+        * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
+        * 
+        * @param metricName metric name
+        * @return fully qualified metric name
+        */
+       public String getMetricIdentifier(String metricName) {
+               return getMetricIdentifier(metricName, null);
+       }
+
+       /**
+        * Returns the fully qualified metric name, for example
+        * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
+        *
+        * @param metricName metric name
+        * @param filter character filter which is applied to the scope 
components if not null.
+        * @return fully qualified metric name
+        */
+       public String getMetricIdentifier(String metricName, CharacterFilter 
filter) {
+               if (scopeString == null) {
+                       if (filter != null) {
+                               scopeString = ScopeFormat.concat(filter, 
registry.getDelimiter(), scopeComponents);
+                       } else {
+                               scopeString = 
ScopeFormat.concat(registry.getDelimiter(), scopeComponents);
+                       }
+               }
+
+               if (filter != null) {
+                       return scopeString + registry.getDelimiter() + 
filter.filterCharacters(metricName);
+               } else {
+                       return scopeString + registry.getDelimiter() + 
metricName;
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Closing
+       // 
------------------------------------------------------------------------
+
+       public void close() {
+               synchronized (this) {
+                       if (!closed) {
+                               closed = true;
+
+                               // close all subgroups
+                               for (AbstractMetricGroup group : 
groups.values()) {
+                                       group.close();
+                               }
+                               groups.clear();
+
+                               // un-register all directly contained metrics
+                               for (Map.Entry<String, Metric> metric : 
metrics.entrySet()) {
+                                       registry.unregister(metric.getValue(), 
metric.getKey(), this);
+                               }
+                               metrics.clear();
+                       }
+               }
+       }
+
+       public final boolean isClosed() {
+               return closed;
+       }
+
+       // 
-----------------------------------------------------------------------------------------------------------------
+       //  Metrics
+       // 
-----------------------------------------------------------------------------------------------------------------
+
+       @Override
+       public Counter counter(int name) {
+               return counter(String.valueOf(name));
+       }
+
+       @Override
+       public Counter counter(String name) {
+               return counter(name, new SimpleCounter());
+       }
+       
+       @Override
+       public <C extends Counter> C counter(int name, C counter) {
+               return counter(String.valueOf(name), counter);
+       }
+
+       @Override
+       public <C extends Counter> C counter(String name, C counter) {
+               addMetric(name, counter);
+               return counter;
+       }
+
+       @Override
+       public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
+               return gauge(String.valueOf(name), gauge);
+       }
+
+       @Override
+       public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
+               addMetric(name, gauge);
+               return gauge;
+       }
+
+       @Override
+       public <H extends Histogram> H histogram(int name, H histogram) {
+               return histogram(String.valueOf(name), histogram);
+       }
+
+       @Override
+       public <H extends Histogram> H histogram(String name, H histogram) {
+               addMetric(name, histogram);
+               return histogram;
+       }
+
+       /**
+        * Adds the given metric to the group and registers it at the registry, 
if the group
+        * is not yet closed, and if no metric with the same name has been 
registered before.
+        * 
+        * @param name the name to register the metric under
+        * @param metric the metric to register
+        */
+       protected void addMetric(String name, Metric metric) {
+               // add the metric only if the group is still open
+               synchronized (this) {
+                       if (!closed) {
+                               // immediately put without a 'contains' check 
to optimize the common case (no collition)
+                               // collisions are resolved later
+                               Metric prior = metrics.put(name, metric);
+
+                               // check for collisions with other metric names
+                               if (prior == null) {
+                                       // no other metric with this name yet
+
+                                       if (groups.containsKey(name)) {
+                                               // we warn here, rather than 
failing, because metrics are tools that should not fail the
+                                               // program when used incorrectly
+                                               LOG.warn("Name collision: 
Adding a metric with the same name as a metric subgroup: '" +
+                                                               name + "'. 
Metric might not get properly reported. (" + scopeString + ')');
+                                       }
+
+                                       registry.register(metric, name, this);
+                               }
+                               else {
+                                       // we had a collision. put back the 
original value
+                                       metrics.put(name, prior);
+                                       
+                                       // we warn here, rather than failing, 
because metrics are tools that should not fail the
+                                       // program when used incorrectly
+                                       LOG.warn("Name collision: Group already 
contains a Metric with the name '" +
+                                                       name + "'. Metric will 
not be reported. (" + scopeString + ')');
+                               }
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Groups
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public MetricGroup addGroup(int name) {
+               return addGroup(String.valueOf(name));
+       }
+
+       @Override
+       public MetricGroup addGroup(String name) {
+               synchronized (this) {
+                       if (!closed) {
+                               // adding a group with the same name as a 
metric creates problems in many reporters/dashboards
+                               // we warn here, rather than failing, because 
metrics are tools that should not fail the
+                               // program when used incorrectly
+                               if (metrics.containsKey(name)) {
+                                       LOG.warn("Name collision: Adding a 
metric subgroup with the same name as an existing metric: '" +
+                                                       name + "'. Metric might 
not get properly reported. (" + scopeString + ')');
+                               }
+
+                               AbstractMetricGroup newGroup = new 
GenericMetricGroup(registry, this, name);
+                               AbstractMetricGroup prior = groups.put(name, 
newGroup);
+                               if (prior == null) {
+                                       // no prior group with that name
+                                       return newGroup;
+                               } else {
+                                       // had a prior group with that name, 
add the prior group back
+                                       groups.put(name, prior);
+                                       return prior;
+                               }
+                       }
+                       else {
+                               // return a non-registered group that is 
immediately closed already
+                               GenericMetricGroup closedGroup = new 
GenericMetricGroup(registry, this, name);
+                               closedGroup.close();
+                               return closedGroup;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
new file mode 100644
index 0000000..7b4cb05
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.runtime.metrics.MetricRegistry;
+
+/**
+ * Abstract {@link org.apache.flink.metrics.MetricGroup} for system components 
(e.g., 
+ * TaskManager, Job, Task, Operator).
+ * 
+ * <p>Usually, the scope of metrics is simply the hierarchy of the containing 
groups. For example
+ * the Metric {@code "MyMetric"} in group {@code "B"} nested in group {@code 
"A"} would have a
+ * fully scoped name of {@code "A.B.MyMetric"}, with {@code "A.B"} being the 
Metric's scope.
+ *
+ * <p>Component groups, however, have configurable scopes. This allow users to 
include or exclude
+ * certain identifiers from the scope. The scope for metrics belonging to the 
"Task"
+ * group could for example include the task attempt number (more fine grained 
identification), or
+ * exclude it (for continuity of the namespace across failure and recovery).
+ */
+public abstract class ComponentMetricGroup extends AbstractMetricGroup {
+
+       /**
+        * Creates a new ComponentMetricGroup.
+        *
+        * @param registry     registry to register new metrics with
+        * @param scope        the scope of the group
+        */
+       public ComponentMetricGroup(MetricRegistry registry, String[] scope) {
+               super(registry, scope);
+       }
+
+       /**
+        * Closes the component group by removing and closing all metrics and 
subgroups
+        * (inherited from {@link AbstractMetricGroup}), plus closing and 
removing all dedicated
+        * component subgroups.
+        */
+       @Override
+       public void close() {
+               synchronized (this) {
+                       if (!isClosed()) {
+                               // remove all metrics and generic subgroups
+                               super.close();
+
+                               // remove and close all subcomponent metrics
+                               for (ComponentMetricGroup group : 
subComponents()) {
+                                       group.close();
+                               }
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Component Metric Group Specifics
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets all component metric groups that are contained in this 
component metric group.
+        * 
+        * @return All component metric groups that are contained in this 
component metric group.
+        */
+       protected abstract Iterable<? extends ComponentMetricGroup> 
subComponents();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
new file mode 100644
index 0000000..09b9ea5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.runtime.metrics.MetricRegistry;
+
+/**
+ * A simple named {@link org.apache.flink.metrics.MetricGroup} that is used to 
hold
+ * subgroups of metrics.
+ */
+public class GenericMetricGroup extends AbstractMetricGroup {
+
+       public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup 
parent, String name) {
+               super(registry, makeScopeComponents(parent, name));
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static String[] makeScopeComponents(AbstractMetricGroup parent, 
String name) {
+               if (parent != null) {
+                       String[] parentComponents = parent.getScopeComponents();
+                       if (parentComponents != null && parentComponents.length 
> 0) {
+                               String[] parts = new 
String[parentComponents.length + 1];
+                               System.arraycopy(parentComponents, 0, parts, 0, 
parentComponents.length);
+                               parts[parts.length - 1] = name;
+                               return parts;
+                       }
+               }
+               return new String[] { name };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java
new file mode 100644
index 0000000..8fa6111
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.metrics.Counter;
+
+/**
+ * Metric group that contains shareable pre-defined IO-related metrics. The 
metrics registration is
+ * forwarded to the parent task metric group.
+ */
+public class IOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
+
+       private final Counter numBytesOut;
+       private final Counter numBytesInLocal;
+       private final Counter numBytesInRemote;
+
+       public IOMetricGroup(TaskMetricGroup parent) {
+               super(parent);
+
+               this.numBytesOut = counter("numBytesOut");
+               this.numBytesInLocal = counter("numBytesInLocal");
+               this.numBytesInRemote = counter("numBytesInRemote");
+       }
+
+       public Counter getBytesOutCounter() {
+               return numBytesOut;
+       }
+
+       public Counter getNumBytesInLocalCounter() {
+               return numBytesInLocal;
+       }
+
+       public Counter getNumBytesInRemoteCounter() {
+               return numBytesInRemote;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
new file mode 100644
index 0000000..807ddd0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.scope.JobManagerJobScopeFormat;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing 
everything belonging to
+ * a specific job, running on the JobManager.
+ */
+public class JobManagerJobMetricGroup extends JobMetricGroup {
+
+       /** The metrics group that contains this group */
+       private final JobManagerMetricGroup parent;
+
+       public JobManagerJobMetricGroup(
+               MetricRegistry registry,
+               JobManagerMetricGroup parent,
+               JobID jobId,
+               @Nullable String jobName) {
+
+               this(registry, checkNotNull(parent), 
registry.getScopeFormats().getJobManagerJobFormat(), jobId, jobName);
+       }
+
+       public JobManagerJobMetricGroup(
+               MetricRegistry registry,
+               JobManagerMetricGroup parent,
+               JobManagerJobScopeFormat scopeFormat,
+               JobID jobId,
+               @Nullable String jobName) {
+
+               super(registry, jobId, jobName, scopeFormat.formatScope(parent, 
jobId, jobName));
+
+               this.parent = checkNotNull(parent);
+       }
+
+       public final JobManagerMetricGroup parent() {
+               return parent;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Component Metric Group Specifics
+       // 
------------------------------------------------------------------------
+
+       @Override
+       protected Iterable<? extends ComponentMetricGroup> subComponents() {
+               return Collections.emptyList();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
new file mode 100644
index 0000000..4d3dfb7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.scope.JobManagerScopeFormat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing a 
JobManager.
+ *
+ * <p>Contains extra logic for adding jobs with tasks, and removing jobs when 
they do
+ * not contain tasks any more
+ */
+public class JobManagerMetricGroup extends ComponentMetricGroup {
+
+       private final Map<JobID, JobManagerJobMetricGroup> jobs = new 
HashMap<>();
+
+       private final String hostname;
+
+       public JobManagerMetricGroup(MetricRegistry registry, String hostname) {
+               this(registry, 
registry.getScopeFormats().getJobManagerFormat(), hostname);
+       }
+
+       public JobManagerMetricGroup(
+               MetricRegistry registry,
+               JobManagerScopeFormat scopeFormat,
+               String hostname) {
+
+               super(registry, scopeFormat.formatScope(hostname));
+               this.hostname = hostname;
+       }
+
+       public String hostname() {
+               return hostname;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  job groups
+       // 
------------------------------------------------------------------------
+
+       public JobManagerJobMetricGroup addJob(JobGraph job) {
+               JobID jobId = job.getJobID();
+               String jobName = job.getName();
+               // get or create a jobs metric group
+               JobManagerJobMetricGroup currentJobGroup;
+               synchronized (this) {
+                       if (!isClosed()) {
+                               currentJobGroup = jobs.get(jobId);
+
+                               if (currentJobGroup == null || 
currentJobGroup.isClosed()) {
+                                       currentJobGroup = new 
JobManagerJobMetricGroup(registry, this, jobId, jobName);
+                                       jobs.put(jobId, currentJobGroup);
+                               }
+                               return currentJobGroup;
+                       } else {
+                               return null;
+                       }
+               }
+       }
+
+       public void removeJob(JobID jobId) {
+               if (jobId == null) {
+                       return;
+               }
+
+               synchronized (this) {
+                       JobManagerJobMetricGroup containedGroup = 
jobs.remove(jobId);
+                       if (containedGroup != null) {
+                               containedGroup.close();
+                       }
+               }
+       }
+
+       public int numRegisteredJobMetricGroups() {
+               return jobs.size();
+       }
+
+       @Override
+       protected Iterable<? extends ComponentMetricGroup> subComponents() {
+               return jobs.values();
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
new file mode 100644
index 0000000..0c1c55c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+
+import javax.annotation.Nullable;
+
+/**
+ * Special abstract {@link org.apache.flink.metrics.MetricGroup} representing 
everything belonging to
+ * a specific job.
+ */
+@Internal
+public abstract class JobMetricGroup extends ComponentMetricGroup {
+
+       /** The ID of the job represented by this metrics group */
+       protected final JobID jobId;
+
+       /** The name of the job represented by this metrics group */
+       @Nullable
+       protected final String jobName;
+
+       // 
------------------------------------------------------------------------
+
+       protected JobMetricGroup(
+                       MetricRegistry registry,
+                       JobID jobId,
+                       @Nullable String jobName,
+                       String[] scope) {
+               super(registry, scope);
+               
+               this.jobId = jobId;
+               this.jobName = jobName;
+       }
+
+       public JobID jobId() {
+               return jobId;
+       }
+
+       @Nullable
+       public String jobName() {
+               return jobName;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
new file mode 100644
index 0000000..caee821
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.scope.OperatorScopeFormat;
+
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing an 
Operator.
+ */
+public class OperatorMetricGroup extends ComponentMetricGroup {
+
+       /** The task metric group that contains this operator metric groups */
+       private final TaskMetricGroup parent;
+
+       public OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup 
parent, String operatorName) {
+               this(registry, parent, 
registry.getScopeFormats().getOperatorFormat(), operatorName);
+       }
+
+       public OperatorMetricGroup(
+                       MetricRegistry registry,
+                       TaskMetricGroup parent,
+                       OperatorScopeFormat scopeFormat,
+                       String operatorName) {
+
+               super(registry, scopeFormat.formatScope(parent, operatorName));
+               this.parent = checkNotNull(parent);
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       public final TaskMetricGroup parent() {
+               return parent;
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Component Metric Group Specifics
+       // 
------------------------------------------------------------------------
+
+       @Override
+       protected Iterable<? extends ComponentMetricGroup> subComponents() {
+               return Collections.emptyList();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
new file mode 100644
index 0000000..6dd38a2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Metric group which forwards all registration calls to its parent metric 
group.
+ *
+ * @param <P> Type of the parent metric group
+ */
+public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup {
+
+       private final P parentMetricGroup;
+
+       public ProxyMetricGroup(P parentMetricGroup) {
+               this.parentMetricGroup = checkNotNull(parentMetricGroup);
+       }
+
+       @Override
+       public final Counter counter(int name) {
+               return parentMetricGroup.counter(name);
+       }
+
+       @Override
+       public final Counter counter(String name) {
+               return parentMetricGroup.counter(name);
+       }
+
+       @Override
+       public final <C extends Counter> C counter(int name, C counter) {
+               return parentMetricGroup.counter(name, counter);
+       }
+
+       @Override
+       public final <C extends Counter> C counter(String name, C counter) {
+               return parentMetricGroup.counter(name, counter);
+       }
+
+       @Override
+       public final <T, G extends Gauge<T>> G gauge(int name, G gauge) {
+               return parentMetricGroup.gauge(name, gauge);
+       }
+
+       @Override
+       public final <T, G extends Gauge<T>> G gauge(String name, G gauge) {
+               return parentMetricGroup.gauge(name, gauge);
+       }
+
+       @Override
+       public final <H extends Histogram> H histogram(String name, H 
histogram) {
+               return parentMetricGroup.histogram(name, histogram);
+       }
+
+       @Override
+       public final <H extends Histogram> H histogram(int name, H histogram) {
+               return parentMetricGroup.histogram(name, histogram);
+       }
+
+       @Override
+       public final MetricGroup addGroup(int name) {
+               return parentMetricGroup.addGroup(name);
+       }
+
+       @Override
+       public final MetricGroup addGroup(String name) {
+               return parentMetricGroup.addGroup(name);
+       }
+
+       @Override
+       public String[] getScopeComponents() {
+               return parentMetricGroup.getScopeComponents();
+       }
+
+       @Override
+       public String getMetricIdentifier(String metricName) {
+               return parentMetricGroup.getMetricIdentifier(metricName);
+       }
+
+       @Override
+       public String getMetricIdentifier(String metricName, CharacterFilter 
filter) {
+               return parentMetricGroup.getMetricIdentifier(metricName, 
filter);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
new file mode 100644
index 0000000..8219ef0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.scope.TaskManagerJobScopeFormat;
+import org.apache.flink.util.AbstractID;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing 
everything belonging to
+ * a specific job, running on the TaskManager.
+ *
+ * <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}).
+ */
+public class TaskManagerJobMetricGroup extends JobMetricGroup {
+
+       /** The metrics group that contains this group */
+       private final TaskManagerMetricGroup parent;
+
+       /** Map from execution attempt ID (task identifier) to task metrics */
+       private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>();
+
+       // 
------------------------------------------------------------------------
+
+       public TaskManagerJobMetricGroup(
+                       MetricRegistry registry,
+                       TaskManagerMetricGroup parent,
+                       JobID jobId,
+                       @Nullable String jobName) {
+
+               this(registry, checkNotNull(parent), 
registry.getScopeFormats().getTaskManagerJobFormat(), jobId, jobName);
+       }
+
+       public TaskManagerJobMetricGroup(
+                       MetricRegistry registry,
+                       TaskManagerMetricGroup parent,
+                       TaskManagerJobScopeFormat scopeFormat,
+                       JobID jobId,
+                       @Nullable String jobName) {
+
+               super(registry, jobId, jobName, scopeFormat.formatScope(parent, 
jobId, jobName));
+
+               this.parent = checkNotNull(parent);
+       }
+
+       public final TaskManagerMetricGroup parent() {
+               return parent;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  adding / removing tasks
+       // 
------------------------------------------------------------------------
+
+       public TaskMetricGroup addTask(TaskDeploymentDescriptor tdd) {
+               AbstractID vertexId = tdd.getVertexID();
+               AbstractID executionId = tdd.getExecutionId();
+               String taskName = tdd.getTaskName();
+               int subtaskIndex = tdd.getIndexInSubtaskGroup();
+               int attemptNumber = tdd.getAttemptNumber();
+
+               checkNotNull(executionId);
+
+               synchronized (this) {
+                       if (!isClosed()) {
+                               TaskMetricGroup task = new 
TaskMetricGroup(registry, this,
+                                       vertexId, executionId, taskName, 
subtaskIndex, attemptNumber);
+                               tasks.put(executionId, task);
+                               return task;
+                       } else {
+                               return null;
+                       }
+               }
+       }
+
+       public void removeTaskMetricGroup(AbstractID executionId) {
+               checkNotNull(executionId);
+
+               boolean removeFromParent = false;
+               synchronized (this) {
+                       if (!isClosed() && tasks.remove(executionId) != null && 
tasks.isEmpty()) {
+                               // this call removed the last task. close this 
group.
+                               removeFromParent = true;
+                               close();
+                       }
+               }
+
+               // IMPORTANT: removing from the parent must not happen while 
holding the this group's lock,
+               //      because it would violate the "first parent then 
subgroup" lock acquisition order
+               if (removeFromParent) {
+                       parent.removeJobMetricsGroup(jobId, this);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Component Metric Group Specifics
+       // 
------------------------------------------------------------------------
+
+       @Override
+       protected Iterable<? extends ComponentMetricGroup> subComponents() {
+               return tasks.values();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
new file mode 100644
index 0000000..f58a95f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing a 
TaskManager.
+ *
+ * <p>Contains extra logic for adding jobs with tasks, and removing jobs when 
they do
+ * not contain tasks any more
+ */
+public class TaskManagerMetricGroup extends ComponentMetricGroup {
+
+       private final Map<JobID, TaskManagerJobMetricGroup> jobs = new 
HashMap<>();
+
+       private final String hostname;
+
+       private final String taskManagerId;
+
+
+       public TaskManagerMetricGroup(MetricRegistry registry, String hostname, 
String taskManagerId) {
+               this(registry, 
registry.getScopeFormats().getTaskManagerFormat(), hostname, taskManagerId);
+       }
+
+       public TaskManagerMetricGroup(
+                       MetricRegistry registry,
+                       TaskManagerScopeFormat scopeFormat,
+                       String hostname, String taskManagerId) {
+
+               super(registry, scopeFormat.formatScope(hostname, 
taskManagerId));
+               this.hostname = hostname;
+               this.taskManagerId = taskManagerId;
+       }
+
+       public String hostname() {
+               return hostname;
+       }
+
+       public String taskManagerId() {
+               return taskManagerId;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  job groups
+       // 
------------------------------------------------------------------------
+
+       public TaskMetricGroup addTaskForJob(TaskDeploymentDescriptor tdd) {
+               JobID jobId = tdd.getJobID();
+               String jobName = tdd.getJobName().length() == 0 
+                       ? tdd.getJobID().toString()
+                       : tdd.getJobName();
+
+               // we cannot strictly lock both our map modification and the 
job group modification
+               // because it might lead to a deadlock
+               while (true) {
+                       // get or create a jobs metric group
+                       TaskManagerJobMetricGroup currentJobGroup;
+                       synchronized (this) {
+                               currentJobGroup = jobs.get(jobId);
+
+                               if (currentJobGroup == null || 
currentJobGroup.isClosed()) {
+                                       currentJobGroup = new 
TaskManagerJobMetricGroup(registry, this, jobId, jobName);
+                                       jobs.put(jobId, currentJobGroup);
+                               }
+                       }
+
+                       // try to add another task. this may fail if we found a 
pre-existing job metrics
+                       // group and it is closed concurrently
+                       TaskMetricGroup taskGroup = 
currentJobGroup.addTask(tdd);
+
+                       if (taskGroup != null) {
+                               // successfully added the next task
+                               return taskGroup;
+                       }
+
+                       // else fall through the loop
+               }
+       }
+
+       public void removeJobMetricsGroup(JobID jobId, 
TaskManagerJobMetricGroup group) {
+               if (jobId == null || group == null || !group.isClosed()) {
+                       return;
+               }
+
+               synchronized (this) {
+                       // optimistically remove the currently contained group, 
and check later if it was correct
+                       TaskManagerJobMetricGroup containedGroup = 
jobs.remove(jobId);
+
+                       // check if another group was actually contained, and 
restore that one
+                       if (containedGroup != null && containedGroup != group) {
+                               jobs.put(jobId, containedGroup);
+                       }
+               }
+       }
+
+       public int numRegisteredJobMetricGroups() {
+               return jobs.size();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Component Metric Group Specifics
+       // 
------------------------------------------------------------------------
+
+       @Override
+       protected Iterable<? extends ComponentMetricGroup> subComponents() {
+               return jobs.values();
+       }
+}
+

Reply via email to