http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
deleted file mode 100644
index 5d8cc5f..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.SimpleCounter;
-
-/**
- * A special {@link MetricGroup} that does not register any metrics at the 
metrics registry
- * and any reporters.
- */
-@Internal
-public class UnregisteredMetricsGroup implements MetricGroup {
-
-       @Override
-       public Counter counter(int name) {
-               return new SimpleCounter();
-       }
-
-       @Override
-       public Counter counter(String name) {
-               return new SimpleCounter();
-       }
-
-       @Override
-       public <C extends Counter> C counter(int name, C counter) {
-               return counter;
-       }
-
-       @Override
-       public <C extends Counter> C counter(String name, C counter) {
-               return counter;
-       }
-
-       @Override
-       public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
-               return gauge;
-       }
-
-       @Override
-       public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
-               return gauge;
-       }
-
-       @Override
-       public <H extends Histogram> H histogram(int name, H histogram) {
-               return histogram;
-       }
-
-       @Override
-       public <H extends Histogram> H histogram(String name, H histogram) {
-               return histogram;
-       }
-
-       @Override
-       public MetricGroup addGroup(int name) {
-               return addGroup(String.valueOf(name));
-       }
-
-       @Override
-       public MetricGroup addGroup(String name) {
-               return new UnregisteredMetricsGroup();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
deleted file mode 100644
index 9297332..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups.scope;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.CharacterFilter;
-import org.apache.flink.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.metrics.groups.JobMetricGroup;
-import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup;
-import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
-import org.apache.flink.util.AbstractID;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * This class represents the format after which the "scope" (or namespace) of 
the various
- * component metric groups is built. Component metric groups
- * ({@link org.apache.flink.metrics.groups.ComponentMetricGroup}), are for 
example
- * "TaskManager", "Task", or "Operator".
- *
- * <p>User defined scope formats allow users to include or exclude
- * certain identifiers from the scope. The scope for metrics belonging to the 
"Task"
- * group could for example include the task attempt number (more fine grained 
identification), or
- * exclude it (continuity of the namespace across failure and recovery).
- */
-public abstract class ScopeFormat {
-
-       private static CharacterFilter defaultFilter = new CharacterFilter() {
-               @Override
-               public String filterCharacters(String input) {
-                       return input;
-               }
-       };
-
-       // 
------------------------------------------------------------------------
-       //  Scope Format Special Characters
-       // 
------------------------------------------------------------------------
-
-       /**
-        * If the scope format starts with this character, then the parent 
components scope
-        * format will be used as a prefix.
-        * 
-        * <p>For example, if the {@link JobMetricGroup} format is {@code 
"*.<job_name>"}, and the
-        * {@link TaskManagerMetricGroup} format is {@code "<host>"}, then the 
job's metrics
-        * will have {@code "<host>.<job_name>"} as their scope.
-        */
-       public static final String SCOPE_INHERIT_PARENT = "*";
-
-       public static final String SCOPE_SEPARATOR = ".";
-
-       private static final String SCOPE_VARIABLE_PREFIX = "<";
-       private static final String SCOPE_VARIABLE_SUFFIX = ">";
-
-       // 
------------------------------------------------------------------------
-       //  Scope Variables
-       // 
------------------------------------------------------------------------
-
-       public static final String SCOPE_ACTOR_HOST = asVariable("host");
-
-       // ----- Job Manager ----
-
-       /** The default scope format of the JobManager component: {@code 
"<host>.jobmanager"} */
-       public static final String DEFAULT_SCOPE_JOBMANAGER_COMPONENT =
-               concat(SCOPE_ACTOR_HOST, "jobmanager");
-
-       /** The default scope format of JobManager metrics: {@code 
"<host>.jobmanager"} */
-       public static final String DEFAULT_SCOPE_JOBMANAGER_GROUP = 
DEFAULT_SCOPE_JOBMANAGER_COMPONENT;
-
-       // ----- Task Manager ----
-
-       public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id");
-
-       /** The default scope format of the TaskManager component: {@code 
"<host>.taskmanager.<tm_id>"} */
-       public static final String DEFAULT_SCOPE_TASKMANAGER_COMPONENT =
-                       concat(SCOPE_ACTOR_HOST, "taskmanager", 
SCOPE_TASKMANAGER_ID);
-
-       /** The default scope format of TaskManager metrics: {@code 
"<host>.taskmanager.<tm_id>"} */
-       public static final String DEFAULT_SCOPE_TASKMANAGER_GROUP = 
DEFAULT_SCOPE_TASKMANAGER_COMPONENT;
-
-       // ----- Job -----
-
-       public static final String SCOPE_JOB_ID = asVariable("job_id");
-       public static final String SCOPE_JOB_NAME = asVariable("job_name");
-
-       /** The default scope format for the job component: {@code 
"<job_name>"} */
-       public static final String DEFAULT_SCOPE_JOB_COMPONENT = SCOPE_JOB_NAME;
-
-       // ----- Job on Job Manager ----
-
-       /** The default scope format for all job metrics on a jobmanager: 
{@code "<host>.jobmanager.<job_name>"} */
-       public static final String DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP =
-               concat(DEFAULT_SCOPE_JOBMANAGER_COMPONENT, 
DEFAULT_SCOPE_JOB_COMPONENT);
-
-       // ----- Job on Task Manager ----
-
-       /** The default scope format for all job metrics on a taskmanager: 
{@code "<host>.taskmanager.<tm_id>.<job_name>"} */
-       public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP =
-                       concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, 
DEFAULT_SCOPE_JOB_COMPONENT);
-
-       // ----- Task ----
-
-       public static final String SCOPE_TASK_VERTEX_ID = asVariable("task_id");
-       public static final String SCOPE_TASK_NAME = asVariable("task_name");
-       public static final String SCOPE_TASK_ATTEMPT_ID = 
asVariable("task_attempt_id");
-       public static final String SCOPE_TASK_ATTEMPT_NUM = 
asVariable("task_attempt_num");
-       public static final String SCOPE_TASK_SUBTASK_INDEX = 
asVariable("subtask_index");
-
-       /** Default scope of the task component: {@code 
"<task_name>.<subtask_index>"} */
-       public static final String DEFAULT_SCOPE_TASK_COMPONENT =
-                       concat(SCOPE_TASK_NAME, SCOPE_TASK_SUBTASK_INDEX);
-
-       /** The default scope format for all task metrics:
-        * {@code 
"<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>"} */
-       public static final String DEFAULT_SCOPE_TASK_GROUP =
-                       concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, 
DEFAULT_SCOPE_TASK_COMPONENT);
-
-       // ----- Operator ----
-
-       public static final String SCOPE_OPERATOR_NAME = 
asVariable("operator_name");
-
-       /** The default scope added by the operator component: 
"<operator_name>.<subtask_index>" */
-       public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT =
-                       concat(SCOPE_OPERATOR_NAME, SCOPE_TASK_SUBTASK_INDEX);
-
-       /** The default scope format for all operator metrics:
-        * {@code 
"<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>"} */
-       public static final String DEFAULT_SCOPE_OPERATOR_GROUP =
-                       concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, 
DEFAULT_SCOPE_OPERATOR_COMPONENT);
-
-       // 
------------------------------------------------------------------------
-       //  Formatters form the individual component types
-       // 
------------------------------------------------------------------------
-
-       /**
-        * The scope format for the {@link JobManagerMetricGroup}.
-        */
-       public static class JobManagerScopeFormat extends ScopeFormat {
-
-               public JobManagerScopeFormat(String format) {
-                       super(format, null, new String[] {
-                               SCOPE_ACTOR_HOST
-                       });
-               }
-
-               public String[] formatScope(String hostname) {
-                       final String[] template = copyTemplate();
-                       final String[] values = { hostname };
-                       return bindVariables(template, values);
-               }
-       }
-
-       /**
-        * The scope format for the {@link TaskManagerMetricGroup}.
-        */
-       public static class TaskManagerScopeFormat extends ScopeFormat {
-
-               public TaskManagerScopeFormat(String format) {
-                       super(format, null, new String[] {
-                                       SCOPE_ACTOR_HOST,
-                                       SCOPE_TASKMANAGER_ID
-                       });
-               }
-
-               public String[] formatScope(String hostname, String 
taskManagerId) {
-                       final String[] template = copyTemplate();
-                       final String[] values = { hostname, taskManagerId };
-                       return bindVariables(template, values);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * The scope format for the {@link JobMetricGroup}.
-        */
-       public static class JobManagerJobScopeFormat extends ScopeFormat {
-
-               public JobManagerJobScopeFormat(String format, 
JobManagerScopeFormat parentFormat) {
-                       super(format, parentFormat, new String[] {
-                               SCOPE_ACTOR_HOST,
-                               SCOPE_JOB_ID,
-                               SCOPE_JOB_NAME
-                       });
-               }
-
-               public String[] formatScope(JobManagerMetricGroup parent, JobID 
jid, String jobName) {
-                       final String[] template = copyTemplate();
-                       final String[] values = {
-                               parent.hostname(),
-                               valueOrNull(jid),
-                               valueOrNull(jobName)
-                       };
-                       return bindVariables(template, values);
-               }
-       }
-
-       /**
-        * The scope format for the {@link JobMetricGroup}.
-        */
-       public static class TaskManagerJobScopeFormat extends ScopeFormat {
-
-               public TaskManagerJobScopeFormat(String format, 
TaskManagerScopeFormat parentFormat) {
-                       super(format, parentFormat, new String[] {
-                                       SCOPE_ACTOR_HOST,
-                                       SCOPE_TASKMANAGER_ID,
-                                       SCOPE_JOB_ID,
-                                       SCOPE_JOB_NAME
-                       });
-               }
-
-               public String[] formatScope(TaskManagerMetricGroup parent, 
JobID jid, String jobName) {
-                       final String[] template = copyTemplate();
-                       final String[] values = {
-                                       parent.hostname(),
-                                       parent.taskManagerId(),
-                                       valueOrNull(jid),
-                                       valueOrNull(jobName)
-                       };
-                       return bindVariables(template, values);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * The scope format for the {@link TaskMetricGroup}.
-        */
-       public static class TaskScopeFormat extends ScopeFormat {
-
-               public TaskScopeFormat(String format, TaskManagerJobScopeFormat 
parentFormat) {
-                       super(format, parentFormat, new String[] {
-                                       SCOPE_ACTOR_HOST,
-                                       SCOPE_TASKMANAGER_ID,
-                                       SCOPE_JOB_ID,
-                                       SCOPE_JOB_NAME,
-                                       SCOPE_TASK_VERTEX_ID,
-                                       SCOPE_TASK_ATTEMPT_ID,
-                                       SCOPE_TASK_NAME,
-                                       SCOPE_TASK_SUBTASK_INDEX,
-                                       SCOPE_TASK_ATTEMPT_NUM
-                       });
-               }
-
-               public String[] formatScope(
-                               TaskManagerJobMetricGroup parent,
-                               AbstractID vertexId, AbstractID attemptId,
-                               String taskName, int subtask, int 
attemptNumber) {
-
-                       final String[] template = copyTemplate();
-                       final String[] values = {
-                                       parent.parent().hostname(),
-                                       parent.parent().taskManagerId(),
-                                       valueOrNull(parent.jobId()),
-                                       valueOrNull(parent.jobName()),
-                                       valueOrNull(vertexId),
-                                       valueOrNull(attemptId),
-                                       valueOrNull(taskName),
-                                       String.valueOf(subtask),
-                                       String.valueOf(attemptNumber)
-                       };
-                       return bindVariables(template, values);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * The scope format for the {@link 
org.apache.flink.metrics.groups.OperatorMetricGroup}.
-        */
-       public static class OperatorScopeFormat extends ScopeFormat {
-
-               public OperatorScopeFormat(String format, TaskScopeFormat 
parentFormat) {
-                       super(format, parentFormat, new String[] {
-                                       SCOPE_ACTOR_HOST,
-                                       SCOPE_TASKMANAGER_ID,
-                                       SCOPE_JOB_ID,
-                                       SCOPE_JOB_NAME,
-                                       SCOPE_TASK_VERTEX_ID,
-                                       SCOPE_TASK_ATTEMPT_ID,
-                                       SCOPE_TASK_NAME,
-                                       SCOPE_TASK_SUBTASK_INDEX,
-                                       SCOPE_TASK_ATTEMPT_NUM,
-                                       SCOPE_OPERATOR_NAME
-                       });
-               }
-
-               public String[] formatScope(TaskMetricGroup parent, String 
operatorName) {
-
-                       final String[] template = copyTemplate();
-                       final String[] values = {
-                                       parent.parent().parent().hostname(),
-                                       
parent.parent().parent().taskManagerId(),
-                                       valueOrNull(parent.parent().jobId()),
-                                       valueOrNull(parent.parent().jobName()),
-                                       valueOrNull(parent.vertexId()),
-                                       valueOrNull(parent.executionId()),
-                                       valueOrNull(parent.taskName()),
-                                       String.valueOf(parent.subtaskIndex()),
-                                       String.valueOf(parent.attemptNumber()),
-                                       valueOrNull(operatorName)
-                       };
-                       return bindVariables(template, values);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Scope Format Base
-       // 
------------------------------------------------------------------------
-
-       /** The scope format */
-       private final String format;
-
-       /** The format, split into components */
-       private final String[] template;
-
-       private final int[] templatePos;
-
-       private final int[] valuePos;
-
-       // 
------------------------------------------------------------------------
-
-       protected ScopeFormat(String format, ScopeFormat parent, String[] 
variables) {
-               checkNotNull(format, "format is null");
-
-               final String[] rawComponents = format.split("\\" + 
SCOPE_SEPARATOR);
-
-               // compute the template array
-               final boolean parentAsPrefix = rawComponents.length > 0 && 
rawComponents[0].equals(SCOPE_INHERIT_PARENT);
-               if (parentAsPrefix) {
-                       if (parent == null) {
-                               throw new IllegalArgumentException("Component 
scope format requires parent prefix (starts with '"
-                                       + SCOPE_INHERIT_PARENT + "'), but this 
component has no parent (is root component).");
-                       }
-
-                       this.format = format.length() > 2 ? format.substring(2) 
: "<empty>";
-
-                       String[] parentTemplate = parent.template;
-                       int parentLen = parentTemplate.length;
-                       
-                       this.template = new String[parentLen + 
rawComponents.length - 1];
-                       System.arraycopy(parentTemplate, 0, this.template, 0, 
parentLen);
-                       System.arraycopy(rawComponents, 1, this.template, 
parentLen, rawComponents.length - 1);
-               }
-               else {
-                       this.format = format.isEmpty() ? "<empty>" : format;
-                       this.template = rawComponents;
-               }
-
-               // --- compute the replacement matrix ---
-               // a bit of clumsy Java collections code ;-)
-               
-               HashMap<String, Integer> varToValuePos = arrayToMap(variables);
-               List<Integer> templatePos = new ArrayList<>();
-               List<Integer> valuePos = new ArrayList<>();
-
-               for (int i = 0; i < template.length; i++) {
-                       final String component = template[i];
-                       
-                       // check if that is a variable
-                       if (component != null && component.length() >= 3 &&
-                                       component.charAt(0) == '<' && 
component.charAt(component.length() - 1) == '>') {
-
-                               // this is a variable
-                               Integer replacementPos = 
varToValuePos.get(component);
-                               if (replacementPos != null) {
-                                       templatePos.add(i);
-                                       valuePos.add(replacementPos);
-                               }
-                       }
-               }
-
-               this.templatePos = integerListToArray(templatePos);
-               this.valuePos = integerListToArray(valuePos);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       public String format() {
-               return format;
-       }
-
-       protected final String[] copyTemplate() {
-               String[] copy = new String[template.length];
-               System.arraycopy(template, 0, copy, 0, template.length);
-               return copy;
-       }
-
-       protected final String[] bindVariables(String[] template, String[] 
values) {
-               final int len = templatePos.length;
-               for (int i = 0; i < len; i++) {
-                       template[templatePos[i]] = values[valuePos[i]];
-               }
-               return template;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return "ScopeFormat '" + format + '\'';
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Formats the given string to resemble a scope variable.
-        *
-        * @param scope The string to format
-        * @return The formatted string
-        */
-       public static String asVariable(String scope) {
-               return SCOPE_VARIABLE_PREFIX + scope + SCOPE_VARIABLE_SUFFIX;
-       }
-
-       public static String concat(String... components) {
-               return concat(defaultFilter, '.', components);
-       }
-
-       public static String concat(CharacterFilter filter, String... 
components) {
-               return concat(filter, '.', components);
-       }
-
-       public static String concat(Character delimiter, String... components) {
-               return concat(defaultFilter, delimiter, components);
-       }
-
-       /**
-        * Concatenates the given component names separated by the delimiter 
character. Additionally
-        * the character filter is applied to all component names.
-        *
-        * @param filter Character filter to be applied to the component names
-        * @param delimiter Delimiter to separate component names
-        * @param components Array of component names
-        * @return The concatenated component name
-        */
-       public static String concat(CharacterFilter filter, Character 
delimiter, String... components) {
-               StringBuilder sb = new StringBuilder();
-               sb.append(filter.filterCharacters(components[0]));
-               for (int x = 1; x < components.length; x++) {
-                       sb.append(delimiter);
-                       sb.append(filter.filterCharacters(components[x]));
-               }
-               return sb.toString();
-       }
-       
-       static String valueOrNull(Object value) {
-               return (value == null || (value instanceof String && ((String) 
value).isEmpty())) ?
-                               "null" : value.toString();
-       }
-
-       static HashMap<String, Integer> arrayToMap(String[] array) {
-               HashMap<String, Integer> map = new HashMap<>(array.length);
-               for (int i = 0; i < array.length; i++) {
-                       map.put(array[i], i);
-               }
-               return map;
-       }
-
-       private static int[] integerListToArray(List<Integer> list) {
-               int[] array = new int[list.size()];
-               int pos = 0;
-               for (Integer i : list) {
-                       array[pos++] = i;
-               }
-               return array;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
deleted file mode 100644
index 978e761..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups.scope;
-
-import org.apache.flink.annotation.Internal;
-import 
org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.OperatorScopeFormat;
-import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
-import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A container for component scope formats.
- */
-@Internal
-public class ScopeFormats {
-
-       private final JobManagerScopeFormat jobManagerFormat;
-       private final JobManagerJobScopeFormat jobManagerJobFormat;
-       private final TaskManagerScopeFormat taskManagerFormat;
-       private final TaskManagerJobScopeFormat taskManagerJobFormat;
-       private final TaskScopeFormat taskFormat;
-       private final OperatorScopeFormat operatorFormat;
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates all default scope formats.
-        */
-       public ScopeFormats() {
-               this.jobManagerFormat = new 
JobManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_COMPONENT);
-
-               this.jobManagerJobFormat = new JobManagerJobScopeFormat(
-                       ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP, 
this.jobManagerFormat);
-
-               this.taskManagerFormat = new 
TaskManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_COMPONENT);
-
-               this.taskManagerJobFormat = new TaskManagerJobScopeFormat(
-                               
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, this.taskManagerFormat);
-               
-               this.taskFormat = new TaskScopeFormat(
-                               ScopeFormat.DEFAULT_SCOPE_TASK_GROUP, 
this.taskManagerJobFormat);
-               
-               this.operatorFormat = new OperatorScopeFormat(
-                               ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP, 
this.taskFormat);
-       }
-
-       /**
-        * Creates all scope formats, based on the given scope format strings.
-        */
-       public ScopeFormats(
-                       String jobManagerFormat,
-                       String jobManagerJobFormat,
-                       String taskManagerFormat,
-                       String taskManagerJobFormat,
-                       String taskFormat,
-                       String operatorFormat)
-       {
-               this.jobManagerFormat = new 
JobManagerScopeFormat(jobManagerFormat);
-               this.jobManagerJobFormat = new 
JobManagerJobScopeFormat(jobManagerJobFormat, this.jobManagerFormat);
-               this.taskManagerFormat = new 
TaskManagerScopeFormat(taskManagerFormat);
-               this.taskManagerJobFormat = new 
TaskManagerJobScopeFormat(taskManagerJobFormat, this.taskManagerFormat);
-               this.taskFormat = new TaskScopeFormat(taskFormat, 
this.taskManagerJobFormat);
-               this.operatorFormat = new OperatorScopeFormat(operatorFormat, 
this.taskFormat);
-       }
-
-       /**
-        * Creates a {@code ScopeFormats} with the given scope formats.
-        */
-       public ScopeFormats(
-                       JobManagerScopeFormat jobManagerFormat,
-                       JobManagerJobScopeFormat jobManagerJobFormat,
-                       TaskManagerScopeFormat taskManagerFormat,
-                       TaskManagerJobScopeFormat taskManagerJobFormat,
-                       TaskScopeFormat taskFormat,
-                       OperatorScopeFormat operatorFormat)
-       {
-               this.jobManagerFormat = checkNotNull(jobManagerFormat);
-               this.jobManagerJobFormat = checkNotNull(jobManagerJobFormat);
-               this.taskManagerFormat = checkNotNull(taskManagerFormat);
-               this.taskManagerJobFormat = checkNotNull(taskManagerJobFormat);
-               this.taskFormat = checkNotNull(taskFormat);
-               this.operatorFormat = checkNotNull(operatorFormat);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       public JobManagerScopeFormat getJobManagerFormat() {
-               return this.jobManagerFormat;
-       }
-
-       public TaskManagerScopeFormat getTaskManagerFormat() {
-               return this.taskManagerFormat;
-       }
-
-       public TaskManagerJobScopeFormat getTaskManagerJobFormat() {
-               return this.taskManagerJobFormat;
-       }
-
-       public JobManagerJobScopeFormat getJobManagerJobFormat() {
-               return this.jobManagerJobFormat;
-       }
-
-       public TaskScopeFormat getTaskFormat() {
-               return this.taskFormat;
-       }
-
-       public OperatorScopeFormat getOperatorFormat() {
-               return this.operatorFormat;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
deleted file mode 100644
index f96901b..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.reporter;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.metrics.CharacterFilter;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.groups.AbstractMetricGroup;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Base interface for custom metric reporters.
- */
-@PublicEvolving
-public abstract class AbstractReporter implements MetricReporter, 
CharacterFilter {
-       protected final Logger log = LoggerFactory.getLogger(getClass());
-
-       protected final Map<Gauge<?>, String> gauges = new HashMap<>();
-       protected final Map<Counter, String> counters = new HashMap<>();
-       protected final Map<Histogram, String> histograms = new HashMap<>();
-
-       @Override
-       public void notifyOfAddedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
-               final String name = group.getMetricIdentifier(metricName, this);
-
-               synchronized (this) {
-                       if (metric instanceof Counter) {
-                               counters.put((Counter) metric, name);
-                       } else if (metric instanceof Gauge) {
-                               gauges.put((Gauge<?>) metric, name);
-                       } else if (metric instanceof Histogram) {
-                               histograms.put((Histogram) metric, name);
-                       } else {
-                               log.warn("Cannot add unknown metric type {}. 
This indicates that the reporter " +
-                                       "does not support this metric type.", 
metric.getClass().getName());
-                       }
-               }
-       }
-
-       @Override
-       public void notifyOfRemovedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
-               synchronized (this) {
-                       if (metric instanceof Counter) {
-                               counters.remove(metric);
-                       } else if (metric instanceof Gauge) {
-                               gauges.remove(metric);
-                       } else if (metric instanceof Histogram) {
-                               histograms.remove(metric);
-                       } else {
-                               log.warn("Cannot remove unknown metric type {}. 
This indicates that the reporter " +
-                                       "does not support this metric type.", 
metric.getClass().getName());
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
deleted file mode 100644
index d84cd5e..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
+++ /dev/null
@@ -1,491 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.reporter;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.util.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.InstanceAlreadyExistsException;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-import javax.management.remote.JMXConnectorServer;
-import javax.management.remote.JMXConnectorServerFactory;
-import javax.management.remote.JMXServiceURL;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.net.MalformedURLException;
-import java.rmi.NoSuchObjectException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.rmi.server.UnicastRemoteObject;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
- *
- * Largely based on the JmxReporter class of the dropwizard metrics library
- * 
https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics/JmxReporter.java
- */
-@Internal
-public class JMXReporter implements MetricReporter {
-
-       private static final String PREFIX = "org.apache.flink.metrics:";
-       private static final String KEY_PREFIX = "key";
-
-       public static final String ARG_PORT = "port";
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(JMXReporter.class);
-
-       // 
------------------------------------------------------------------------
-
-       /** The server where the management beans are registered and 
deregistered */
-       private final MBeanServer mBeanServer;
-
-       /** The names under which the registered metrics have been added to the 
MBeanServer */ 
-       private final Map<Metric, ObjectName> registeredMetrics;
-
-       /** The server to which JMX clients connect to. ALlows for better 
control over port usage. */
-       private JMXServer jmxServer;
-
-       /**
-        * Creates a new JMXReporter
-        */
-       public JMXReporter() {
-               this.mBeanServer = ManagementFactory.getPlatformMBeanServer();
-               this.registeredMetrics = new HashMap<>();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  life cycle
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void open(Configuration config) {
-               String portsConfig = config.getString(ARG_PORT, null);
-
-               if (portsConfig != null) {
-                       Iterator<Integer> ports = 
NetUtils.getPortRangeFromString(portsConfig);
-
-                       JMXServer server = new JMXServer();
-                       while (ports.hasNext()) {
-                               int port = ports.next();
-                               try {
-                                       server.start(port);
-                                       LOG.info("Started JMX server on port " 
+ port + ".");
-                                       // only set our field if the server was 
actually started
-                                       jmxServer = server;
-                                       break;
-                               } catch (IOException ioe) { //assume port 
conflict
-                                       LOG.debug("Could not start JMX server 
on port " + port + ".", ioe);
-                                       try {
-                                               server.stop();
-                                       } catch (Exception e) {
-                                               LOG.debug("Could not stop JMX 
server.", e);
-                                       }
-                               }
-                       }
-                       if (jmxServer == null) {
-                               throw new RuntimeException("Could not start JMX 
server on any configured port. Ports: " + portsConfig);
-                       }
-               }
-       }
-
-       @Override
-       public void close() {
-               if (jmxServer != null) {
-                       try {
-                               jmxServer.stop();
-                       } catch (IOException e) {
-                               LOG.error("Failed to stop JMX server.", e);
-                       }
-               }
-       }
-       
-       public int getPort() {
-               if (jmxServer == null) {
-                       throw new NullPointerException("No server was opened. 
Did you specify a port?");
-               }
-               return jmxServer.port;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  adding / removing metrics
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void notifyOfAddedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
-               final String name = generateJmxName(metricName, 
group.getScopeComponents());
-
-               AbstractBean jmxMetric;
-               ObjectName jmxName;
-               try {
-                       jmxName = new ObjectName(name);
-               } catch (MalformedObjectNameException e) {
-                       LOG.error("Metric name did not conform to JMX 
ObjectName rules: " + name, e);
-                       return;
-               }
-
-               if (metric instanceof Gauge) {
-                       jmxMetric = new JmxGauge((Gauge<?>) metric);
-               } else if (metric instanceof Counter) {
-                       jmxMetric = new JmxCounter((Counter) metric);
-               } else if (metric instanceof Histogram) {
-                       jmxMetric = new JmxHistogram((Histogram) metric);
-               } else {
-                       LOG.error("Cannot add unknown metric type: {}. This 
indicates that the metric type " +
-                               "is not supported by this reporter.", 
metric.getClass().getName());
-                       return;
-               }
-
-               try {
-                       synchronized (this) {
-                               mBeanServer.registerMBean(jmxMetric, jmxName);
-                               registeredMetrics.put(metric, jmxName);
-                       }
-               } catch (NotCompliantMBeanException e) {
-                       // implementation error on our side
-                       LOG.error("Metric did not comply with JMX MBean naming 
rules.", e);
-               } catch (InstanceAlreadyExistsException e) {
-                       LOG.debug("A metric with the name " + jmxName + " was 
already registered.", e);
-                       LOG.error("A metric with the name " + jmxName + " was 
already registered.");
-               } catch (Throwable t) {
-                       LOG.error("Failed to register metric", t);
-               }
-       }
-
-       @Override
-       public void notifyOfRemovedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
-               try {
-                       synchronized (this) {
-                               final ObjectName jmxName = 
registeredMetrics.remove(metric);
-
-                               // remove the metric if it is known. if it is 
not known, ignore the request
-                               if (jmxName != null) {
-                                       mBeanServer.unregisterMBean(jmxName);
-                               }
-                       }
-               } catch (InstanceNotFoundException e) {
-                       // alright then
-               } catch (Throwable t) {
-                       // never propagate exceptions - the metrics reporter 
should not affect the stability
-                       // of the running system
-                       LOG.error("Un-registering metric failed", t);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities 
-       // 
------------------------------------------------------------------------
-
-       static String generateJmxName(String metricName, String[] 
scopeComponents) {
-               final StringBuilder nameBuilder = new StringBuilder(128);
-               nameBuilder.append(PREFIX);
-
-               for (int x = 0; x < scopeComponents.length; x++) {
-                       // write keyX=
-                       nameBuilder.append(KEY_PREFIX);
-                       nameBuilder.append(x);
-                       nameBuilder.append("=");
-
-                       // write scope component
-                       
nameBuilder.append(replaceInvalidChars(scopeComponents[x]));
-                       nameBuilder.append(",");
-               }
-
-               // write the name
-               
nameBuilder.append("name=").append(replaceInvalidChars(metricName));
-
-               return nameBuilder.toString();
-       }
-       
-       /**
-        * Lightweight method to replace unsupported characters.
-        * If the string does not contain any unsupported characters, this 
method creates no
-        * new string (and in fact no new objects at all).
-        * 
-        * <p>Replacements:
-        * 
-        * <ul>
-        *     <li>{@code "} is removed</li>
-        *     <li>{@code space} is replaced by {@code _} (underscore)</li>
-        *     <li>{@code , = ; : ? ' *} are replaced by {@code -} (hyphen)</li>
-        * </ul>
-        */
-       static String replaceInvalidChars(String str) {
-               char[] chars = null;
-               final int strLen = str.length();
-               int pos = 0;
-               
-               for (int i = 0; i < strLen; i++) {
-                       final char c = str.charAt(i);
-                       switch (c) {
-                               case '"':
-                                       // remove character by not moving cursor
-                                       if (chars == null) {
-                                               chars = str.toCharArray();
-                                       }
-                                       break;
-
-                               case ' ':
-                                       if (chars == null) {
-                                               chars = str.toCharArray();
-                                       }
-                                       chars[pos++] = '_';
-                                       break;
-                               
-                               case ',':
-                               case '=':
-                               case ';':
-                               case ':':
-                               case '?':
-                               case '\'':
-                               case '*':
-                                       if (chars == null) {
-                                               chars = str.toCharArray();
-                                       }
-                                       chars[pos++] = '-';
-                                       break;
-
-                               default:
-                                       if (chars != null) {
-                                               chars[pos] = c;
-                                       }
-                                       pos++;
-                       }
-               }
-               
-               return chars == null ? str : new String(chars, 0, pos);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Interfaces and base classes for JMX beans 
-       // 
------------------------------------------------------------------------
-
-       public interface MetricMBean {}
-
-       private abstract static class AbstractBean implements MetricMBean {}
-
-       public interface JmxCounterMBean extends MetricMBean {
-               long getCount();
-       }
-
-       private static class JmxCounter extends AbstractBean implements 
JmxCounterMBean {
-               private Counter counter;
-
-               JmxCounter(Counter counter) {
-                       this.counter = counter;
-               }
-
-               @Override
-               public long getCount() {
-                       return counter.getCount();
-               }
-       }
-
-       public interface JmxGaugeMBean extends MetricMBean {
-               Object getValue();
-       }
-
-       private static class JmxGauge extends AbstractBean implements 
JmxGaugeMBean {
-
-               private final Gauge<?> gauge;
-
-               JmxGauge(Gauge<?> gauge) {
-                       this.gauge = gauge;
-               }
-
-               @Override
-               public Object getValue() {
-                       return gauge.getValue();
-               }
-       }
-
-       public interface JmxHistogramMBean extends MetricMBean {
-               long getCount();
-
-               double getMean();
-
-               double getStdDev();
-
-               long getMax();
-
-               long getMin();
-
-               double getMedian();
-
-               double get75thPercentile();
-
-               double get95thPercentile();
-
-               double get98thPercentile();
-
-               double get99thPercentile();
-
-               double get999thPercentile();
-       }
-
-       private static class JmxHistogram extends AbstractBean implements 
JmxHistogramMBean {
-
-               private final Histogram histogram;
-
-               JmxHistogram(Histogram histogram) {
-                       this.histogram = histogram;
-               }
-
-               @Override
-               public long getCount() {
-                       return histogram.getCount();
-               }
-
-               @Override
-               public double getMean() {
-                       return histogram.getStatistics().getMean();
-               }
-
-               @Override
-               public double getStdDev() {
-                       return histogram.getStatistics().getStdDev();
-               }
-
-               @Override
-               public long getMax() {
-                       return histogram.getStatistics().getMax();
-               }
-
-               @Override
-               public long getMin() {
-                       return histogram.getStatistics().getMin();
-               }
-
-               @Override
-               public double getMedian() {
-                       return histogram.getStatistics().getQuantile(0.5);
-               }
-
-               @Override
-               public double get75thPercentile() {
-                       return histogram.getStatistics().getQuantile(0.75);
-               }
-
-               @Override
-               public double get95thPercentile() {
-                       return histogram.getStatistics().getQuantile(0.95);
-               }
-
-               @Override
-               public double get98thPercentile() {
-                       return histogram.getStatistics().getQuantile(0.98);
-               }
-
-               @Override
-               public double get99thPercentile() {
-                       return histogram.getStatistics().getQuantile(0.99);
-               }
-
-               @Override
-               public double get999thPercentile() {
-                       return histogram.getStatistics().getQuantile(0.999);
-               }
-       }
-
-       /**
-        * JMX Server implementation that JMX clients can connect to.
-        *
-        * Heavily based on j256 simplejmx project
-        *
-        * 
https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
-        */
-       private static class JMXServer {
-               private Registry rmiRegistry;
-               private JMXConnectorServer connector;
-               private int port;
-
-               public void start(int port) throws IOException {
-                       if (rmiRegistry != null && connector != null) {
-                               LOG.debug("JMXServer is already running.");
-                               return;
-                       }
-                       startRmiRegistry(port);
-                       startJmxService(port);
-                       this.port = port;
-               }
-
-               /**
-                * Starts an RMI Registry that allows clients to lookup the JMX 
IP/port.
-                *
-                * @param port rmi port to use
-                * @throws IOException
-                */
-               private void startRmiRegistry(int port) throws IOException {
-                       rmiRegistry = LocateRegistry.createRegistry(port);
-               }
-
-               /**
-                * Starts a JMX connector that allows (un)registering MBeans 
with the MBean server and RMI invocations.
-                *
-                * @param port jmx port to use
-                * @throws IOException
-                */
-               private void startJmxService(int port) throws IOException {
-                       String serviceUrl = "service:jmx:rmi://localhost:" + 
port + "/jndi/rmi://localhost:" + port + "/jmxrmi";
-                       JMXServiceURL url;
-                       try {
-                               url = new JMXServiceURL(serviceUrl);
-                       } catch (MalformedURLException e) {
-                               throw new IllegalArgumentException("Malformed 
service url created " + serviceUrl, e);
-                       }
-
-                       connector = 
JMXConnectorServerFactory.newJMXConnectorServer(url, null, 
ManagementFactory.getPlatformMBeanServer());
-
-                       connector.start();
-               }
-
-               public void stop() throws IOException {
-                       if (connector != null) {
-                               try {
-                                       connector.stop();
-                               } finally {
-                                       connector = null;
-                               }
-                       }
-                       if (rmiRegistry != null) {
-                               try {
-                                       
UnicastRemoteObject.unexportObject(rmiRegistry, true);
-                               } catch (NoSuchObjectException e) {
-                                       throw new IOException("Could not 
un-export our RMI registry", e);
-                               } finally {
-                                       rmiRegistry = null;
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
deleted file mode 100644
index 9458246..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.reporter;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.groups.AbstractMetricGroup;
-
-/**
- * Reporters are used to export {@link Metric Metrics} to an external backend.
- * 
- * <p>Reporters are instantiated via reflection and must be public, 
non-abstract, and have a
- * public no-argument constructor.
- */
-@PublicEvolving
-public interface MetricReporter {
-
-       // 
------------------------------------------------------------------------
-       //  life cycle
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Configures this reporter. Since reporters are instantiated 
generically and hence parameter-less,
-        * this method is the place where the reporters set their basic fields 
based on configuration values.
-        * 
-        * <p>This method is always called first on a newly instantiated 
reporter.
-        *
-        * @param config The configuration with all parameters.
-        */
-       void open(Configuration config);
-
-       /**
-        * Closes this reporter. Should be used to close channels, streams and 
release resources.
-        */
-       void close();
-
-       // 
------------------------------------------------------------------------
-       //  adding / removing metrics
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Called when a new {@link Metric} was added.
-        *
-        * @param metric      the metric that was added
-        * @param metricName  the name of the metric
-        * @param group       the group that contains the metric
-        */
-       void notifyOfAddedMetric(Metric metric, String metricName, 
AbstractMetricGroup group);
-
-       /**
-        * Called when a {@link Metric} was should be removed.
-        *
-        * @param metric      the metric that should be removed
-        * @param metricName  the name of the metric
-        * @param group       the group that contains the metric
-        */
-       void notifyOfRemovedMetric(Metric metric, String metricName, 
AbstractMetricGroup group);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
deleted file mode 100644
index cf1fc52..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.reporter;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * Interface for reporters that actively send out data periodically.
- */
-@PublicEvolving
-public interface Scheduled {
-
-       /**
-        * Report the current measurements. This method is called periodically 
by the
-        * metrics registry that uses the reoprter.
-        */
-       void report();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java 
b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
deleted file mode 100644
index 8dec4f6..0000000
--- a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.metrics.groups.scope.ScopeFormats;
-import org.apache.flink.metrics.reporter.Scheduled;
-import org.apache.flink.metrics.util.TestReporter;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class MetricRegistryTest extends TestLogger {
-       
-       /**
-        * Verifies that the reporter class argument is correctly used to 
instantiate and open the reporter.
-        */
-       @Test
-       public void testReporterInstantiation() {
-               Configuration config = new Configuration();
-
-               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter1.class.getName());
-
-               new MetricRegistry(config);
-
-               Assert.assertTrue(TestReporter1.wasOpened);
-       }
-
-       protected static class TestReporter1 extends TestReporter {
-               public static boolean wasOpened = false;
-
-               @Override
-               public void open(Configuration config) {
-                       wasOpened = true;
-               }
-       }
-
-       /**
-        * Verifies that configured arguments are properly forwarded to the 
reporter.
-        */
-       @Test
-       public void testReporterArgumentForwarding() {
-               Configuration config = new Configuration();
-
-               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter2.class.getName());
-               config.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, 
"--arg1 hello --arg2 world");
-
-               new MetricRegistry(config);
-       }
-
-       protected static class TestReporter2 extends TestReporter {
-               @Override
-               public void open(Configuration config) {
-                       Assert.assertEquals("hello", config.getString("arg1", 
null));
-                       Assert.assertEquals("world", config.getString("arg2", 
null));
-               }
-       }
-
-       /**
-        * Verifies that reporters implementing the Scheduled interface are 
regularly called to report the metrics.
-        *
-        * @throws InterruptedException
-        */
-       @Test
-       public void testReporterScheduling() throws InterruptedException {
-               Configuration config = new Configuration();
-
-               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter3.class.getName());
-               config.setString(ConfigConstants.METRICS_REPORTER_INTERVAL, "50 
MILLISECONDS");
-
-               new MetricRegistry(config);
-
-               long start = System.currentTimeMillis();
-               for (int x = 0; x < 10; x++) {
-                       Thread.sleep(100);
-                       int reportCount = TestReporter3.reportCount;
-                       long curT = System.currentTimeMillis();
-                       /**
-                        * Within a given time-frame T only T/500 reports may 
be triggered due to the interval between reports. 
-                        * This value however does not not take the first 
triggered report into account (=> +1). 
-                        * Furthermore we have to account for the mis-alignment 
between reports being triggered and our time 
-                        * measurement (=> +1); for T=200 a total of 4-6 
reports may have been
-                        * triggered depending on whether the end of the 
interval for the first reports ends before
-                        * or after T=50.
-                        */
-                       long maxAllowedReports = (curT - start) / 50 + 2;
-                       Assert.assertTrue("Too many report were triggered.", 
maxAllowedReports >= reportCount);
-               }
-               Assert.assertTrue("No report was triggered.", 
TestReporter3.reportCount > 0);
-       }
-
-       protected static class TestReporter3 extends TestReporter implements 
Scheduled {
-               public static int reportCount = 0;
-
-               @Override
-               public void report() {
-                       reportCount++;
-               }
-       }
-
-       /**
-        * Verifies that reporters implementing the Listener interface are 
notified when Metrics are added or removed.
-        */
-       @Test
-       public void testListener() {
-               Configuration config = new Configuration();
-               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter6.class.getName());
-
-               MetricRegistry registry = new MetricRegistry(config);
-
-               TaskManagerMetricGroup root = new 
TaskManagerMetricGroup(registry, "host", "id");
-               root.counter("rootCounter");
-               root.close();
-
-               assertTrue(TestReporter6.addCalled);
-               assertTrue(TestReporter6.removeCalled);
-       }
-
-       protected static class TestReporter6 extends TestReporter {
-               public static boolean addCalled = false;
-               public static boolean removeCalled = false;
-
-               @Override
-               public void notifyOfAddedMetric(Metric metric, String 
metricName, AbstractMetricGroup group) {
-                       addCalled = true;
-                       assertTrue(metric instanceof Counter);
-                       assertEquals("rootCounter", metricName);
-               }
-
-               @Override
-               public void notifyOfRemovedMetric(Metric metric, String 
metricName, AbstractMetricGroup group) {
-                       removeCalled = true;
-                       Assert.assertTrue(metric instanceof Counter);
-                       Assert.assertEquals("rootCounter", metricName);
-               }
-       }
-
-       /**
-        * Verifies that the scope configuration is properly extracted.
-        */
-       @Test
-       public void testScopeConfig() {
-               Configuration config = new Configuration();
-
-               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A");
-               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, 
"B");
-               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
"C");
-               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, 
"D");
-
-               ScopeFormats scopeConfig = 
MetricRegistry.createScopeConfig(config);
-
-               assertEquals("A", scopeConfig.getTaskManagerFormat().format());
-               assertEquals("B", 
scopeConfig.getTaskManagerJobFormat().format());
-               assertEquals("C", scopeConfig.getTaskFormat().format());
-               assertEquals("D", scopeConfig.getOperatorFormat().format());
-       }
-
-       @Test
-       public void testConfigurableDelimiter() {
-               Configuration config = new Configuration();
-               config.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
-               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"A.B.C.D.E");
-
-               MetricRegistry registry = new MetricRegistry(config);
-
-               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "host", "id");
-               assertEquals("A_B_C_D_E_name", 
tmGroup.getMetricIdentifier("name"));
-
-               registry.shutdown();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java
deleted file mode 100644
index fb0af2e..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
-import org.junit.Test;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class JobManagerGroupTest {
-
-       // 
------------------------------------------------------------------------
-       //  adding and removing jobs
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void addAndRemoveJobs() {
-               final JobManagerMetricGroup group = new JobManagerMetricGroup(
-                       new MetricRegistry(new Configuration()), "localhost");
-
-               final JobID jid1 = new JobID();
-               final JobID jid2 = new JobID();
-
-               final String jobName1 = "testjob";
-               final String jobName2 = "anotherJob";
-
-               JobManagerJobMetricGroup jmJobGroup11 = group.addJob(jid1, 
jobName1);
-               JobManagerJobMetricGroup jmJobGroup12 = group.addJob(jid1, 
jobName1);
-               JobManagerJobMetricGroup jmJobGroup21 = group.addJob(jid2, 
jobName2);
-
-               assertEquals(jmJobGroup11, jmJobGroup12);
-
-               assertEquals(2, group.numRegisteredJobMetricGroups());
-
-               group.removeJob(jid1);
-
-               assertTrue(jmJobGroup11.isClosed());
-               assertEquals(1, group.numRegisteredJobMetricGroups());
-
-               group.removeJob(jid2);
-
-               assertTrue(jmJobGroup21.isClosed());
-               assertEquals(0, group.numRegisteredJobMetricGroups());
-       }
-
-       @Test
-       public void testCloseClosesAll() {
-               final JobManagerMetricGroup group = new JobManagerMetricGroup(
-                       new MetricRegistry(new Configuration()), "localhost");
-
-               final JobID jid1 = new JobID();
-               final JobID jid2 = new JobID();
-
-               final String jobName1 = "testjob";
-               final String jobName2 = "anotherJob";
-
-               JobManagerJobMetricGroup jmJobGroup11 = group.addJob(jid1, 
jobName1);
-               JobManagerJobMetricGroup jmJobGroup21 = group.addJob(jid2, 
jobName2);
-
-               group.close();
-
-               assertTrue(jmJobGroup11.isClosed());
-               assertTrue(jmJobGroup21.isClosed());
-       }
-
-       // 
------------------------------------------------------------------------
-       //  scope name tests
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testGenerateScopeDefault() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-               JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost");
-
-               assertArrayEquals(new String[]{"localhost", "jobmanager"}, 
group.getScopeComponents());
-               assertEquals("localhost.jobmanager.name", 
group.getMetricIdentifier("name"));
-       }
-
-       @Test
-       public void testGenerateScopeCustom() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-               JobManagerScopeFormat format = new 
JobManagerScopeFormat("constant.<host>.foo.<host>");
-               JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, format, "host");
-
-               assertArrayEquals(new String[]{"constant", "host", "foo", 
"host"}, group.getScopeComponents());
-               assertEquals("constant.host.foo.host.name", 
group.getMetricIdentifier("name"));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java
deleted file mode 100644
index dd2829c..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricRegistry;
-import 
org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
-import org.junit.Test;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-public class JobManagerJobGroupTest {
-
-       @Test
-       public void testGenerateScopeDefault() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-
-               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, "theHostName");
-               JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, new JobID(), "myJobName");
-
-               assertArrayEquals(
-                               new String[] { "theHostName", "jobmanager", 
"myJobName"},
-                               jmGroup.getScopeComponents());
-
-               assertEquals(
-                               "theHostName.jobmanager.myJobName.name",
-                               jmGroup.getMetricIdentifier("name"));
-       }
-
-       @Test
-       public void testGenerateScopeCustom() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-
-               JobManagerScopeFormat tmFormat = new 
JobManagerScopeFormat("abc");
-               JobManagerJobScopeFormat jmFormat = new 
JobManagerJobScopeFormat("some-constant.<job_name>", tmFormat);
-
-               JobID jid = new JobID();
-
-               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, "theHostName");
-               JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, jmFormat, jid, "myJobName");
-
-               assertArrayEquals(
-                               new String[] { "some-constant", "myJobName" },
-                               jmGroup.getScopeComponents());
-
-               assertEquals(
-                               "some-constant.myJobName.name",
-                               jmGroup.getMetricIdentifier("name"));
-       }
-
-       @Test
-       public void testGenerateScopeCustomWildcard() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-
-               JobManagerScopeFormat tmFormat = new 
JobManagerScopeFormat("peter");
-               JobManagerJobScopeFormat jmFormat = new 
JobManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat);
-
-               JobID jid = new JobID();
-
-               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, tmFormat, "theHostName");
-               JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, jmFormat, jid, "myJobName");
-
-               assertArrayEquals(
-                               new String[] { "peter", "some-constant", 
jid.toString() },
-                               jmGroup.getScopeComponents());
-
-               assertEquals(
-                               "peter.some-constant." + jid + ".name",
-                               jmGroup.getMetricIdentifier("name"));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
deleted file mode 100644
index 2a737df..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.HistogramStatistics;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.util.TestReporter;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class MetricGroupRegistrationTest {
-       /**
-        * Verifies that group methods instantiate the correct metric with the 
given name.
-        */
-       @Test
-       public void testMetricInstantiation() {
-               Configuration config = new Configuration();
-               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter1.class.getName());
-
-               MetricRegistry registry = new MetricRegistry(config);
-
-               MetricGroup root = new TaskManagerMetricGroup(registry, "host", 
"id");
-
-               Counter counter = root.counter("counter");
-               assertEquals(counter, TestReporter1.lastPassedMetric);
-               assertEquals("counter", TestReporter1.lastPassedName);
-
-               Gauge<Object> gauge = root.gauge("gauge", new Gauge<Object>() {
-                       @Override
-                       public Object getValue() {
-                               return null;
-                       }
-               });
-               
-               Assert.assertEquals(gauge, TestReporter1.lastPassedMetric);
-               assertEquals("gauge", TestReporter1.lastPassedName);
-
-               Histogram histogram = root.histogram("histogram", new 
Histogram() {
-                       @Override
-                       public void update(long value) {
-
-                       }
-
-                       @Override
-                       public long getCount() {
-                               return 0;
-                       }
-
-                       @Override
-                       public HistogramStatistics getStatistics() {
-                               return null;
-                       }
-               });
-
-               Assert.assertEquals(histogram, TestReporter1.lastPassedMetric);
-               assertEquals("histogram", TestReporter1.lastPassedName);
-               registry.shutdown();
-       }
-
-       public static class TestReporter1 extends TestReporter {
-               
-               public static Metric lastPassedMetric;
-               public static String lastPassedName;
-
-               @Override
-               public void notifyOfAddedMetric(Metric metric, String 
metricName, AbstractMetricGroup group) {
-                       lastPassedMetric = metric;
-                       lastPassedName = metricName;
-               }
-       }
-
-       /**
-        * Verifies that when attempting to create a group with the name of an 
existing one the existing one will be returned instead.
-        */
-       @Test
-       public void testDuplicateGroupName() {
-               Configuration config = new Configuration();
-
-               MetricRegistry registry = new MetricRegistry(config);
-
-               MetricGroup root = new TaskManagerMetricGroup(registry, "host", 
"id");
-
-               MetricGroup group1 = root.addGroup("group");
-               MetricGroup group2 = root.addGroup("group");
-               MetricGroup group3 = root.addGroup("group");
-               Assert.assertTrue(group1 == group2 && group2 == group3);
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java
deleted file mode 100644
index 227db33..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.MetricRegistry;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class MetricGroupTest {
-       
-       private MetricRegistry registry;
-
-       private final MetricRegistry exceptionOnRegister = new 
ExceptionOnRegisterRegistry();
-
-       @Before
-       public void createRegistry() {
-               this.registry = new MetricRegistry(new Configuration());
-       }
-
-       @After
-       public void shutdownRegistry() {
-               this.registry.shutdown();
-               this.registry = null;
-       }
-
-       @Test
-       public void sameGroupOnNameCollision() {
-               GenericMetricGroup group = new GenericMetricGroup(
-               registry, new DummyAbstractMetricGroup(registry), "somegroup");
-
-               String groupName = "sometestname";
-               MetricGroup subgroup1 = group.addGroup(groupName);
-               MetricGroup subgroup2 = group.addGroup(groupName);
-               
-               assertNotNull(subgroup1);
-               assertNotNull(subgroup2);
-               assertTrue(subgroup1 == subgroup2);
-       }
-
-       @Test
-       public void closedGroupDoesNotRegisterMetrics() {
-               GenericMetricGroup group = new GenericMetricGroup(
-                               exceptionOnRegister, new 
DummyAbstractMetricGroup(exceptionOnRegister), "testgroup");
-               assertFalse(group.isClosed());
-
-               group.close();
-               assertTrue(group.isClosed());
-
-               // these will fail is the registration is propagated
-               group.counter("testcounter");
-               group.gauge("testgauge", new Gauge<Object>() {
-                       @Override
-                       public Object getValue() { return null; }
-               });
-       }
-       
-       @Test
-       public void closedGroupCreatesClosedGroups() {
-               GenericMetricGroup group = new 
GenericMetricGroup(exceptionOnRegister,
-                               new 
DummyAbstractMetricGroup(exceptionOnRegister), "testgroup");
-               assertFalse(group.isClosed());
-
-               group.close();
-               assertTrue(group.isClosed());
-               
-               AbstractMetricGroup subgroup = (AbstractMetricGroup) 
group.addGroup("test subgroup");
-               assertTrue(subgroup.isClosed());
-       }
-
-       @Test
-       public void tolerateMetricNameCollisions() {
-               final String name = "abctestname";
-               GenericMetricGroup group = new GenericMetricGroup(
-                               registry, new 
DummyAbstractMetricGroup(registry), "testgroup");
-               
-               assertNotNull(group.counter(name));
-               assertNotNull(group.counter(name));
-       }
-
-       @Test
-       public void tolerateMetricAndGroupNameCollisions() {
-               final String name = "abctestname";
-               GenericMetricGroup group = new GenericMetricGroup(
-                               registry, new 
DummyAbstractMetricGroup(registry), "testgroup");
-               
-               assertNotNull(group.addGroup(name));
-               assertNotNull(group.counter(name));
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       private static class ExceptionOnRegisterRegistry extends MetricRegistry 
{
-               
-               public ExceptionOnRegisterRegistry() {
-                       super(new Configuration());
-               }
-
-               @Override
-               public void register(Metric metric, String name, 
AbstractMetricGroup parent) {
-                       fail("Metric should never be registered");
-               }
-
-               @Override
-               public void unregister(Metric metric, String name, 
AbstractMetricGroup parent) {
-                       fail("Metric should never be un-registered");
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       private static class DummyAbstractMetricGroup extends 
AbstractMetricGroup {
-
-               public DummyAbstractMetricGroup(MetricRegistry registry) {
-                       super(registry, new String[0]);
-               }
-
-               @Override
-               protected void addMetric(String name, Metric metric) {}
-
-               @Override
-               public MetricGroup addGroup(String name) {
-                       return new DummyAbstractMetricGroup(registry);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
deleted file mode 100644
index 7ec3d58..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.util.AbstractID;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-public class OperatorGroupTest {
-
-       @Test
-       public void testGenerateScopeDefault() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-
-               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-               TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
-               TaskMetricGroup taskGroup = new TaskMetricGroup(
-                               registry, jmGroup,  new AbstractID(),  new 
AbstractID(), "aTaskName", 11, 0);
-               OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, 
taskGroup, "myOpName");
-
-               assertArrayEquals(
-                               new String[] { "theHostName", "taskmanager", 
"test-tm-id", "myJobName", "myOpName", "11" },
-                               opGroup.getScopeComponents());
-
-               assertEquals(
-                               
"theHostName.taskmanager.test-tm-id.myJobName.myOpName.11.name",
-                               opGroup.getMetricIdentifier("name"));
-
-               registry.shutdown();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
deleted file mode 100644
index efaa433..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricRegistry;
-import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
-import org.apache.flink.util.AbstractID;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class TaskManagerGroupTest {
-
-       // 
------------------------------------------------------------------------
-       //  adding and removing jobs
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void addAndRemoveJobs() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-
-               final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
-                               registry, "localhost", new 
AbstractID().toString());
-               
-               
-               final JobID jid1 = new JobID();
-               final JobID jid2 = new JobID();
-               
-               final String jobName1 = "testjob";
-               final String jobName2 = "anotherJob";
-               
-               final AbstractID vertex11 = new AbstractID();
-               final AbstractID vertex12 = new AbstractID();
-               final AbstractID vertex13 = new AbstractID();
-               final AbstractID vertex21 = new AbstractID();
-
-               final AbstractID execution11 = new AbstractID();
-               final AbstractID execution12 = new AbstractID();
-               final AbstractID execution13 = new AbstractID();
-               final AbstractID execution21 = new AbstractID();
-               
-               TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, 
vertex11, execution11, "test", 17, 0);
-               TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, 
vertex12, execution12, "test", 13, 1);
-               TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, 
vertex21, execution21, "test", 7, 2);
-               
-               assertEquals(2, group.numRegisteredJobMetricGroups());
-               assertFalse(tmGroup11.parent().isClosed());
-               assertFalse(tmGroup12.parent().isClosed());
-               assertFalse(tmGroup21.parent().isClosed());
-               
-               // close all for job 2 and one from job 1
-               tmGroup11.close();
-               tmGroup21.close();
-               assertTrue(tmGroup11.isClosed());
-               assertTrue(tmGroup21.isClosed());
-               
-               // job 2 should be removed, job should still be there
-               assertFalse(tmGroup11.parent().isClosed());
-               assertFalse(tmGroup12.parent().isClosed());
-               assertTrue(tmGroup21.parent().isClosed());
-               assertEquals(1, group.numRegisteredJobMetricGroups());
-               
-               // add one more to job one
-               TaskMetricGroup tmGroup13 = group.addTaskForJob(jid1, jobName1, 
vertex13, execution13, "test", 0, 0);
-               tmGroup12.close();
-               tmGroup13.close();
-
-               assertTrue(tmGroup11.parent().isClosed());
-               assertTrue(tmGroup12.parent().isClosed());
-               assertTrue(tmGroup13.parent().isClosed());
-               
-               assertEquals(0, group.numRegisteredJobMetricGroups());
-
-               registry.shutdown();
-       }
-
-       @Test
-       public void testCloseClosesAll() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-               final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
-                               registry, "localhost", new 
AbstractID().toString());
-
-
-               final JobID jid1 = new JobID();
-               final JobID jid2 = new JobID();
-
-               final String jobName1 = "testjob";
-               final String jobName2 = "anotherJob";
-
-               final AbstractID vertex11 = new AbstractID();
-               final AbstractID vertex12 = new AbstractID();
-               final AbstractID vertex21 = new AbstractID();
-
-               final AbstractID execution11 = new AbstractID();
-               final AbstractID execution12 = new AbstractID();
-               final AbstractID execution21 = new AbstractID();
-
-               TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, 
vertex11, execution11, "test", 17, 1);
-               TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, 
vertex12, execution12, "test", 13, 2);
-               TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, 
vertex21, execution21, "test", 7, 1);
-               
-               group.close();
-               
-               assertTrue(tmGroup11.isClosed());
-               assertTrue(tmGroup12.isClosed());
-               assertTrue(tmGroup21.isClosed());
-
-               registry.shutdown();
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  scope name tests
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testGenerateScopeDefault() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-               TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, "localhost", "id");
-
-               assertArrayEquals(new String[] { "localhost", "taskmanager", 
"id" }, group.getScopeComponents());
-               assertEquals("localhost.taskmanager.id.name", 
group.getMetricIdentifier("name"));
-               registry.shutdown();
-       }
-
-       @Test
-       public void testGenerateScopeCustom() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-               TaskManagerScopeFormat format = new 
TaskManagerScopeFormat("constant.<host>.foo.<host>");
-               TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, format, "host", "id");
-
-               assertArrayEquals(new String[] { "constant", "host", "foo", 
"host" }, group.getScopeComponents());
-               assertEquals("constant.host.foo.host.name", 
group.getMetricIdentifier("name"));
-               registry.shutdown();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java
deleted file mode 100644
index 117d5bb..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricRegistry;
-import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
-import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-public class TaskManagerJobGroupTest {
-
-       @Test
-       public void testGenerateScopeDefault() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-
-               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-               JobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
-
-               assertArrayEquals(
-                               new String[] { "theHostName", "taskmanager", 
"test-tm-id", "myJobName"},
-                               jmGroup.getScopeComponents());
-
-               assertEquals(
-                               
"theHostName.taskmanager.test-tm-id.myJobName.name",
-                               jmGroup.getMetricIdentifier("name"));
-               registry.shutdown();
-       }
-
-       @Test
-       public void testGenerateScopeCustom() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-
-               TaskManagerScopeFormat tmFormat = new 
TaskManagerScopeFormat("abc");
-               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat("some-constant.<job_name>", tmFormat);
-
-               JobID jid = new JobID();
-
-               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-               JobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
-
-               assertArrayEquals(
-                               new String[] { "some-constant", "myJobName" },
-                               jmGroup.getScopeComponents());
-
-               assertEquals(
-                               "some-constant.myJobName.name",
-                               jmGroup.getMetricIdentifier("name"));
-               registry.shutdown();
-       }
-
-       @Test
-       public void testGenerateScopeCustomWildcard() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-
-               TaskManagerScopeFormat tmFormat = new 
TaskManagerScopeFormat("peter.<tm_id>");
-               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat);
-
-               JobID jid = new JobID();
-
-               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, tmFormat, "theHostName", "test-tm-id");
-               JobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
-
-               assertArrayEquals(
-                               new String[] { "peter", "test-tm-id", 
"some-constant", jid.toString() },
-                               jmGroup.getScopeComponents());
-
-               assertEquals(
-                               "peter.test-tm-id.some-constant." + jid + 
".name",
-                               jmGroup.getMetricIdentifier("name"));
-               registry.shutdown();
-       }
-}

Reply via email to