http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java deleted file mode 100644 index 5d8cc5f..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Histogram; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.SimpleCounter; - -/** - * A special {@link MetricGroup} that does not register any metrics at the metrics registry - * and any reporters. - */ -@Internal -public class UnregisteredMetricsGroup implements MetricGroup { - - @Override - public Counter counter(int name) { - return new SimpleCounter(); - } - - @Override - public Counter counter(String name) { - return new SimpleCounter(); - } - - @Override - public <C extends Counter> C counter(int name, C counter) { - return counter; - } - - @Override - public <C extends Counter> C counter(String name, C counter) { - return counter; - } - - @Override - public <T, G extends Gauge<T>> G gauge(int name, G gauge) { - return gauge; - } - - @Override - public <T, G extends Gauge<T>> G gauge(String name, G gauge) { - return gauge; - } - - @Override - public <H extends Histogram> H histogram(int name, H histogram) { - return histogram; - } - - @Override - public <H extends Histogram> H histogram(String name, H histogram) { - return histogram; - } - - @Override - public MetricGroup addGroup(int name) { - return addGroup(String.valueOf(name)); - } - - @Override - public MetricGroup addGroup(String name) { - return new UnregisteredMetricsGroup(); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java deleted file mode 100644 index 9297332..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java +++ /dev/null @@ -1,490 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups.scope; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.CharacterFilter; -import org.apache.flink.metrics.groups.JobManagerMetricGroup; -import org.apache.flink.metrics.groups.JobMetricGroup; -import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup; -import org.apache.flink.metrics.groups.TaskManagerMetricGroup; -import org.apache.flink.metrics.groups.TaskMetricGroup; -import org.apache.flink.util.AbstractID; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * This class represents the format after which the "scope" (or namespace) of the various - * component metric groups is built. Component metric groups - * ({@link org.apache.flink.metrics.groups.ComponentMetricGroup}), are for example - * "TaskManager", "Task", or "Operator". - * - * <p>User defined scope formats allow users to include or exclude - * certain identifiers from the scope. The scope for metrics belonging to the "Task" - * group could for example include the task attempt number (more fine grained identification), or - * exclude it (continuity of the namespace across failure and recovery). - */ -public abstract class ScopeFormat { - - private static CharacterFilter defaultFilter = new CharacterFilter() { - @Override - public String filterCharacters(String input) { - return input; - } - }; - - // ------------------------------------------------------------------------ - // Scope Format Special Characters - // ------------------------------------------------------------------------ - - /** - * If the scope format starts with this character, then the parent components scope - * format will be used as a prefix. - * - * <p>For example, if the {@link JobMetricGroup} format is {@code "*.<job_name>"}, and the - * {@link TaskManagerMetricGroup} format is {@code "<host>"}, then the job's metrics - * will have {@code "<host>.<job_name>"} as their scope. - */ - public static final String SCOPE_INHERIT_PARENT = "*"; - - public static final String SCOPE_SEPARATOR = "."; - - private static final String SCOPE_VARIABLE_PREFIX = "<"; - private static final String SCOPE_VARIABLE_SUFFIX = ">"; - - // ------------------------------------------------------------------------ - // Scope Variables - // ------------------------------------------------------------------------ - - public static final String SCOPE_ACTOR_HOST = asVariable("host"); - - // ----- Job Manager ---- - - /** The default scope format of the JobManager component: {@code "<host>.jobmanager"} */ - public static final String DEFAULT_SCOPE_JOBMANAGER_COMPONENT = - concat(SCOPE_ACTOR_HOST, "jobmanager"); - - /** The default scope format of JobManager metrics: {@code "<host>.jobmanager"} */ - public static final String DEFAULT_SCOPE_JOBMANAGER_GROUP = DEFAULT_SCOPE_JOBMANAGER_COMPONENT; - - // ----- Task Manager ---- - - public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id"); - - /** The default scope format of the TaskManager component: {@code "<host>.taskmanager.<tm_id>"} */ - public static final String DEFAULT_SCOPE_TASKMANAGER_COMPONENT = - concat(SCOPE_ACTOR_HOST, "taskmanager", SCOPE_TASKMANAGER_ID); - - /** The default scope format of TaskManager metrics: {@code "<host>.taskmanager.<tm_id>"} */ - public static final String DEFAULT_SCOPE_TASKMANAGER_GROUP = DEFAULT_SCOPE_TASKMANAGER_COMPONENT; - - // ----- Job ----- - - public static final String SCOPE_JOB_ID = asVariable("job_id"); - public static final String SCOPE_JOB_NAME = asVariable("job_name"); - - /** The default scope format for the job component: {@code "<job_name>"} */ - public static final String DEFAULT_SCOPE_JOB_COMPONENT = SCOPE_JOB_NAME; - - // ----- Job on Job Manager ---- - - /** The default scope format for all job metrics on a jobmanager: {@code "<host>.jobmanager.<job_name>"} */ - public static final String DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP = - concat(DEFAULT_SCOPE_JOBMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT); - - // ----- Job on Task Manager ---- - - /** The default scope format for all job metrics on a taskmanager: {@code "<host>.taskmanager.<tm_id>.<job_name>"} */ - public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP = - concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT); - - // ----- Task ---- - - public static final String SCOPE_TASK_VERTEX_ID = asVariable("task_id"); - public static final String SCOPE_TASK_NAME = asVariable("task_name"); - public static final String SCOPE_TASK_ATTEMPT_ID = asVariable("task_attempt_id"); - public static final String SCOPE_TASK_ATTEMPT_NUM = asVariable("task_attempt_num"); - public static final String SCOPE_TASK_SUBTASK_INDEX = asVariable("subtask_index"); - - /** Default scope of the task component: {@code "<task_name>.<subtask_index>"} */ - public static final String DEFAULT_SCOPE_TASK_COMPONENT = - concat(SCOPE_TASK_NAME, SCOPE_TASK_SUBTASK_INDEX); - - /** The default scope format for all task metrics: - * {@code "<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>"} */ - public static final String DEFAULT_SCOPE_TASK_GROUP = - concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_TASK_COMPONENT); - - // ----- Operator ---- - - public static final String SCOPE_OPERATOR_NAME = asVariable("operator_name"); - - /** The default scope added by the operator component: "<operator_name>.<subtask_index>" */ - public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT = - concat(SCOPE_OPERATOR_NAME, SCOPE_TASK_SUBTASK_INDEX); - - /** The default scope format for all operator metrics: - * {@code "<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>"} */ - public static final String DEFAULT_SCOPE_OPERATOR_GROUP = - concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_OPERATOR_COMPONENT); - - // ------------------------------------------------------------------------ - // Formatters form the individual component types - // ------------------------------------------------------------------------ - - /** - * The scope format for the {@link JobManagerMetricGroup}. - */ - public static class JobManagerScopeFormat extends ScopeFormat { - - public JobManagerScopeFormat(String format) { - super(format, null, new String[] { - SCOPE_ACTOR_HOST - }); - } - - public String[] formatScope(String hostname) { - final String[] template = copyTemplate(); - final String[] values = { hostname }; - return bindVariables(template, values); - } - } - - /** - * The scope format for the {@link TaskManagerMetricGroup}. - */ - public static class TaskManagerScopeFormat extends ScopeFormat { - - public TaskManagerScopeFormat(String format) { - super(format, null, new String[] { - SCOPE_ACTOR_HOST, - SCOPE_TASKMANAGER_ID - }); - } - - public String[] formatScope(String hostname, String taskManagerId) { - final String[] template = copyTemplate(); - final String[] values = { hostname, taskManagerId }; - return bindVariables(template, values); - } - } - - // ------------------------------------------------------------------------ - - /** - * The scope format for the {@link JobMetricGroup}. - */ - public static class JobManagerJobScopeFormat extends ScopeFormat { - - public JobManagerJobScopeFormat(String format, JobManagerScopeFormat parentFormat) { - super(format, parentFormat, new String[] { - SCOPE_ACTOR_HOST, - SCOPE_JOB_ID, - SCOPE_JOB_NAME - }); - } - - public String[] formatScope(JobManagerMetricGroup parent, JobID jid, String jobName) { - final String[] template = copyTemplate(); - final String[] values = { - parent.hostname(), - valueOrNull(jid), - valueOrNull(jobName) - }; - return bindVariables(template, values); - } - } - - /** - * The scope format for the {@link JobMetricGroup}. - */ - public static class TaskManagerJobScopeFormat extends ScopeFormat { - - public TaskManagerJobScopeFormat(String format, TaskManagerScopeFormat parentFormat) { - super(format, parentFormat, new String[] { - SCOPE_ACTOR_HOST, - SCOPE_TASKMANAGER_ID, - SCOPE_JOB_ID, - SCOPE_JOB_NAME - }); - } - - public String[] formatScope(TaskManagerMetricGroup parent, JobID jid, String jobName) { - final String[] template = copyTemplate(); - final String[] values = { - parent.hostname(), - parent.taskManagerId(), - valueOrNull(jid), - valueOrNull(jobName) - }; - return bindVariables(template, values); - } - } - - // ------------------------------------------------------------------------ - - /** - * The scope format for the {@link TaskMetricGroup}. - */ - public static class TaskScopeFormat extends ScopeFormat { - - public TaskScopeFormat(String format, TaskManagerJobScopeFormat parentFormat) { - super(format, parentFormat, new String[] { - SCOPE_ACTOR_HOST, - SCOPE_TASKMANAGER_ID, - SCOPE_JOB_ID, - SCOPE_JOB_NAME, - SCOPE_TASK_VERTEX_ID, - SCOPE_TASK_ATTEMPT_ID, - SCOPE_TASK_NAME, - SCOPE_TASK_SUBTASK_INDEX, - SCOPE_TASK_ATTEMPT_NUM - }); - } - - public String[] formatScope( - TaskManagerJobMetricGroup parent, - AbstractID vertexId, AbstractID attemptId, - String taskName, int subtask, int attemptNumber) { - - final String[] template = copyTemplate(); - final String[] values = { - parent.parent().hostname(), - parent.parent().taskManagerId(), - valueOrNull(parent.jobId()), - valueOrNull(parent.jobName()), - valueOrNull(vertexId), - valueOrNull(attemptId), - valueOrNull(taskName), - String.valueOf(subtask), - String.valueOf(attemptNumber) - }; - return bindVariables(template, values); - } - } - - // ------------------------------------------------------------------------ - - /** - * The scope format for the {@link org.apache.flink.metrics.groups.OperatorMetricGroup}. - */ - public static class OperatorScopeFormat extends ScopeFormat { - - public OperatorScopeFormat(String format, TaskScopeFormat parentFormat) { - super(format, parentFormat, new String[] { - SCOPE_ACTOR_HOST, - SCOPE_TASKMANAGER_ID, - SCOPE_JOB_ID, - SCOPE_JOB_NAME, - SCOPE_TASK_VERTEX_ID, - SCOPE_TASK_ATTEMPT_ID, - SCOPE_TASK_NAME, - SCOPE_TASK_SUBTASK_INDEX, - SCOPE_TASK_ATTEMPT_NUM, - SCOPE_OPERATOR_NAME - }); - } - - public String[] formatScope(TaskMetricGroup parent, String operatorName) { - - final String[] template = copyTemplate(); - final String[] values = { - parent.parent().parent().hostname(), - parent.parent().parent().taskManagerId(), - valueOrNull(parent.parent().jobId()), - valueOrNull(parent.parent().jobName()), - valueOrNull(parent.vertexId()), - valueOrNull(parent.executionId()), - valueOrNull(parent.taskName()), - String.valueOf(parent.subtaskIndex()), - String.valueOf(parent.attemptNumber()), - valueOrNull(operatorName) - }; - return bindVariables(template, values); - } - } - - // ------------------------------------------------------------------------ - // Scope Format Base - // ------------------------------------------------------------------------ - - /** The scope format */ - private final String format; - - /** The format, split into components */ - private final String[] template; - - private final int[] templatePos; - - private final int[] valuePos; - - // ------------------------------------------------------------------------ - - protected ScopeFormat(String format, ScopeFormat parent, String[] variables) { - checkNotNull(format, "format is null"); - - final String[] rawComponents = format.split("\\" + SCOPE_SEPARATOR); - - // compute the template array - final boolean parentAsPrefix = rawComponents.length > 0 && rawComponents[0].equals(SCOPE_INHERIT_PARENT); - if (parentAsPrefix) { - if (parent == null) { - throw new IllegalArgumentException("Component scope format requires parent prefix (starts with '" - + SCOPE_INHERIT_PARENT + "'), but this component has no parent (is root component)."); - } - - this.format = format.length() > 2 ? format.substring(2) : "<empty>"; - - String[] parentTemplate = parent.template; - int parentLen = parentTemplate.length; - - this.template = new String[parentLen + rawComponents.length - 1]; - System.arraycopy(parentTemplate, 0, this.template, 0, parentLen); - System.arraycopy(rawComponents, 1, this.template, parentLen, rawComponents.length - 1); - } - else { - this.format = format.isEmpty() ? "<empty>" : format; - this.template = rawComponents; - } - - // --- compute the replacement matrix --- - // a bit of clumsy Java collections code ;-) - - HashMap<String, Integer> varToValuePos = arrayToMap(variables); - List<Integer> templatePos = new ArrayList<>(); - List<Integer> valuePos = new ArrayList<>(); - - for (int i = 0; i < template.length; i++) { - final String component = template[i]; - - // check if that is a variable - if (component != null && component.length() >= 3 && - component.charAt(0) == '<' && component.charAt(component.length() - 1) == '>') { - - // this is a variable - Integer replacementPos = varToValuePos.get(component); - if (replacementPos != null) { - templatePos.add(i); - valuePos.add(replacementPos); - } - } - } - - this.templatePos = integerListToArray(templatePos); - this.valuePos = integerListToArray(valuePos); - } - - // ------------------------------------------------------------------------ - - public String format() { - return format; - } - - protected final String[] copyTemplate() { - String[] copy = new String[template.length]; - System.arraycopy(template, 0, copy, 0, template.length); - return copy; - } - - protected final String[] bindVariables(String[] template, String[] values) { - final int len = templatePos.length; - for (int i = 0; i < len; i++) { - template[templatePos[i]] = values[valuePos[i]]; - } - return template; - } - - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "ScopeFormat '" + format + '\''; - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - /** - * Formats the given string to resemble a scope variable. - * - * @param scope The string to format - * @return The formatted string - */ - public static String asVariable(String scope) { - return SCOPE_VARIABLE_PREFIX + scope + SCOPE_VARIABLE_SUFFIX; - } - - public static String concat(String... components) { - return concat(defaultFilter, '.', components); - } - - public static String concat(CharacterFilter filter, String... components) { - return concat(filter, '.', components); - } - - public static String concat(Character delimiter, String... components) { - return concat(defaultFilter, delimiter, components); - } - - /** - * Concatenates the given component names separated by the delimiter character. Additionally - * the character filter is applied to all component names. - * - * @param filter Character filter to be applied to the component names - * @param delimiter Delimiter to separate component names - * @param components Array of component names - * @return The concatenated component name - */ - public static String concat(CharacterFilter filter, Character delimiter, String... components) { - StringBuilder sb = new StringBuilder(); - sb.append(filter.filterCharacters(components[0])); - for (int x = 1; x < components.length; x++) { - sb.append(delimiter); - sb.append(filter.filterCharacters(components[x])); - } - return sb.toString(); - } - - static String valueOrNull(Object value) { - return (value == null || (value instanceof String && ((String) value).isEmpty())) ? - "null" : value.toString(); - } - - static HashMap<String, Integer> arrayToMap(String[] array) { - HashMap<String, Integer> map = new HashMap<>(array.length); - for (int i = 0; i < array.length; i++) { - map.put(array[i], i); - } - return map; - } - - private static int[] integerListToArray(List<Integer> list) { - int[] array = new int[list.size()]; - int pos = 0; - for (Integer i : list) { - array[pos++] = i; - } - return array; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java deleted file mode 100644 index 978e761..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups.scope; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat; -import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat; -import org.apache.flink.metrics.groups.scope.ScopeFormat.OperatorScopeFormat; -import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat; -import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat; -import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A container for component scope formats. - */ -@Internal -public class ScopeFormats { - - private final JobManagerScopeFormat jobManagerFormat; - private final JobManagerJobScopeFormat jobManagerJobFormat; - private final TaskManagerScopeFormat taskManagerFormat; - private final TaskManagerJobScopeFormat taskManagerJobFormat; - private final TaskScopeFormat taskFormat; - private final OperatorScopeFormat operatorFormat; - - // ------------------------------------------------------------------------ - - /** - * Creates all default scope formats. - */ - public ScopeFormats() { - this.jobManagerFormat = new JobManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_COMPONENT); - - this.jobManagerJobFormat = new JobManagerJobScopeFormat( - ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP, this.jobManagerFormat); - - this.taskManagerFormat = new TaskManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_COMPONENT); - - this.taskManagerJobFormat = new TaskManagerJobScopeFormat( - ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, this.taskManagerFormat); - - this.taskFormat = new TaskScopeFormat( - ScopeFormat.DEFAULT_SCOPE_TASK_GROUP, this.taskManagerJobFormat); - - this.operatorFormat = new OperatorScopeFormat( - ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP, this.taskFormat); - } - - /** - * Creates all scope formats, based on the given scope format strings. - */ - public ScopeFormats( - String jobManagerFormat, - String jobManagerJobFormat, - String taskManagerFormat, - String taskManagerJobFormat, - String taskFormat, - String operatorFormat) - { - this.jobManagerFormat = new JobManagerScopeFormat(jobManagerFormat); - this.jobManagerJobFormat = new JobManagerJobScopeFormat(jobManagerJobFormat, this.jobManagerFormat); - this.taskManagerFormat = new TaskManagerScopeFormat(taskManagerFormat); - this.taskManagerJobFormat = new TaskManagerJobScopeFormat(taskManagerJobFormat, this.taskManagerFormat); - this.taskFormat = new TaskScopeFormat(taskFormat, this.taskManagerJobFormat); - this.operatorFormat = new OperatorScopeFormat(operatorFormat, this.taskFormat); - } - - /** - * Creates a {@code ScopeFormats} with the given scope formats. - */ - public ScopeFormats( - JobManagerScopeFormat jobManagerFormat, - JobManagerJobScopeFormat jobManagerJobFormat, - TaskManagerScopeFormat taskManagerFormat, - TaskManagerJobScopeFormat taskManagerJobFormat, - TaskScopeFormat taskFormat, - OperatorScopeFormat operatorFormat) - { - this.jobManagerFormat = checkNotNull(jobManagerFormat); - this.jobManagerJobFormat = checkNotNull(jobManagerJobFormat); - this.taskManagerFormat = checkNotNull(taskManagerFormat); - this.taskManagerJobFormat = checkNotNull(taskManagerJobFormat); - this.taskFormat = checkNotNull(taskFormat); - this.operatorFormat = checkNotNull(operatorFormat); - } - - // ------------------------------------------------------------------------ - - public JobManagerScopeFormat getJobManagerFormat() { - return this.jobManagerFormat; - } - - public TaskManagerScopeFormat getTaskManagerFormat() { - return this.taskManagerFormat; - } - - public TaskManagerJobScopeFormat getTaskManagerJobFormat() { - return this.taskManagerJobFormat; - } - - public JobManagerJobScopeFormat getJobManagerJobFormat() { - return this.jobManagerJobFormat; - } - - public TaskScopeFormat getTaskFormat() { - return this.taskFormat; - } - - public OperatorScopeFormat getOperatorFormat() { - return this.operatorFormat; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java deleted file mode 100644 index f96901b..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.reporter; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.metrics.CharacterFilter; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Histogram; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.groups.AbstractMetricGroup; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Map; - -/** - * Base interface for custom metric reporters. - */ -@PublicEvolving -public abstract class AbstractReporter implements MetricReporter, CharacterFilter { - protected final Logger log = LoggerFactory.getLogger(getClass()); - - protected final Map<Gauge<?>, String> gauges = new HashMap<>(); - protected final Map<Counter, String> counters = new HashMap<>(); - protected final Map<Histogram, String> histograms = new HashMap<>(); - - @Override - public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { - final String name = group.getMetricIdentifier(metricName, this); - - synchronized (this) { - if (metric instanceof Counter) { - counters.put((Counter) metric, name); - } else if (metric instanceof Gauge) { - gauges.put((Gauge<?>) metric, name); - } else if (metric instanceof Histogram) { - histograms.put((Histogram) metric, name); - } else { - log.warn("Cannot add unknown metric type {}. This indicates that the reporter " + - "does not support this metric type.", metric.getClass().getName()); - } - } - } - - @Override - public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) { - synchronized (this) { - if (metric instanceof Counter) { - counters.remove(metric); - } else if (metric instanceof Gauge) { - gauges.remove(metric); - } else if (metric instanceof Histogram) { - histograms.remove(metric); - } else { - log.warn("Cannot remove unknown metric type {}. This indicates that the reporter " + - "does not support this metric type.", metric.getClass().getName()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java deleted file mode 100644 index d84cd5e..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java +++ /dev/null @@ -1,491 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.reporter; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Histogram; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.groups.AbstractMetricGroup; -import org.apache.flink.util.NetUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.management.InstanceAlreadyExistsException; -import javax.management.InstanceNotFoundException; -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.NotCompliantMBeanException; -import javax.management.ObjectName; -import javax.management.remote.JMXConnectorServer; -import javax.management.remote.JMXConnectorServerFactory; -import javax.management.remote.JMXServiceURL; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.net.MalformedURLException; -import java.rmi.NoSuchObjectException; -import java.rmi.registry.LocateRegistry; -import java.rmi.registry.Registry; -import java.rmi.server.UnicastRemoteObject; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -/** - * {@link MetricReporter} that exports {@link Metric Metrics} via JMX. - * - * Largely based on the JmxReporter class of the dropwizard metrics library - * https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics/JmxReporter.java - */ -@Internal -public class JMXReporter implements MetricReporter { - - private static final String PREFIX = "org.apache.flink.metrics:"; - private static final String KEY_PREFIX = "key"; - - public static final String ARG_PORT = "port"; - - private static final Logger LOG = LoggerFactory.getLogger(JMXReporter.class); - - // ------------------------------------------------------------------------ - - /** The server where the management beans are registered and deregistered */ - private final MBeanServer mBeanServer; - - /** The names under which the registered metrics have been added to the MBeanServer */ - private final Map<Metric, ObjectName> registeredMetrics; - - /** The server to which JMX clients connect to. ALlows for better control over port usage. */ - private JMXServer jmxServer; - - /** - * Creates a new JMXReporter - */ - public JMXReporter() { - this.mBeanServer = ManagementFactory.getPlatformMBeanServer(); - this.registeredMetrics = new HashMap<>(); - } - - // ------------------------------------------------------------------------ - // life cycle - // ------------------------------------------------------------------------ - - @Override - public void open(Configuration config) { - String portsConfig = config.getString(ARG_PORT, null); - - if (portsConfig != null) { - Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig); - - JMXServer server = new JMXServer(); - while (ports.hasNext()) { - int port = ports.next(); - try { - server.start(port); - LOG.info("Started JMX server on port " + port + "."); - // only set our field if the server was actually started - jmxServer = server; - break; - } catch (IOException ioe) { //assume port conflict - LOG.debug("Could not start JMX server on port " + port + ".", ioe); - try { - server.stop(); - } catch (Exception e) { - LOG.debug("Could not stop JMX server.", e); - } - } - } - if (jmxServer == null) { - throw new RuntimeException("Could not start JMX server on any configured port. Ports: " + portsConfig); - } - } - } - - @Override - public void close() { - if (jmxServer != null) { - try { - jmxServer.stop(); - } catch (IOException e) { - LOG.error("Failed to stop JMX server.", e); - } - } - } - - public int getPort() { - if (jmxServer == null) { - throw new NullPointerException("No server was opened. Did you specify a port?"); - } - return jmxServer.port; - } - - // ------------------------------------------------------------------------ - // adding / removing metrics - // ------------------------------------------------------------------------ - - @Override - public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { - final String name = generateJmxName(metricName, group.getScopeComponents()); - - AbstractBean jmxMetric; - ObjectName jmxName; - try { - jmxName = new ObjectName(name); - } catch (MalformedObjectNameException e) { - LOG.error("Metric name did not conform to JMX ObjectName rules: " + name, e); - return; - } - - if (metric instanceof Gauge) { - jmxMetric = new JmxGauge((Gauge<?>) metric); - } else if (metric instanceof Counter) { - jmxMetric = new JmxCounter((Counter) metric); - } else if (metric instanceof Histogram) { - jmxMetric = new JmxHistogram((Histogram) metric); - } else { - LOG.error("Cannot add unknown metric type: {}. This indicates that the metric type " + - "is not supported by this reporter.", metric.getClass().getName()); - return; - } - - try { - synchronized (this) { - mBeanServer.registerMBean(jmxMetric, jmxName); - registeredMetrics.put(metric, jmxName); - } - } catch (NotCompliantMBeanException e) { - // implementation error on our side - LOG.error("Metric did not comply with JMX MBean naming rules.", e); - } catch (InstanceAlreadyExistsException e) { - LOG.debug("A metric with the name " + jmxName + " was already registered.", e); - LOG.error("A metric with the name " + jmxName + " was already registered."); - } catch (Throwable t) { - LOG.error("Failed to register metric", t); - } - } - - @Override - public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) { - try { - synchronized (this) { - final ObjectName jmxName = registeredMetrics.remove(metric); - - // remove the metric if it is known. if it is not known, ignore the request - if (jmxName != null) { - mBeanServer.unregisterMBean(jmxName); - } - } - } catch (InstanceNotFoundException e) { - // alright then - } catch (Throwable t) { - // never propagate exceptions - the metrics reporter should not affect the stability - // of the running system - LOG.error("Un-registering metric failed", t); - } - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - static String generateJmxName(String metricName, String[] scopeComponents) { - final StringBuilder nameBuilder = new StringBuilder(128); - nameBuilder.append(PREFIX); - - for (int x = 0; x < scopeComponents.length; x++) { - // write keyX= - nameBuilder.append(KEY_PREFIX); - nameBuilder.append(x); - nameBuilder.append("="); - - // write scope component - nameBuilder.append(replaceInvalidChars(scopeComponents[x])); - nameBuilder.append(","); - } - - // write the name - nameBuilder.append("name=").append(replaceInvalidChars(metricName)); - - return nameBuilder.toString(); - } - - /** - * Lightweight method to replace unsupported characters. - * If the string does not contain any unsupported characters, this method creates no - * new string (and in fact no new objects at all). - * - * <p>Replacements: - * - * <ul> - * <li>{@code "} is removed</li> - * <li>{@code space} is replaced by {@code _} (underscore)</li> - * <li>{@code , = ; : ? ' *} are replaced by {@code -} (hyphen)</li> - * </ul> - */ - static String replaceInvalidChars(String str) { - char[] chars = null; - final int strLen = str.length(); - int pos = 0; - - for (int i = 0; i < strLen; i++) { - final char c = str.charAt(i); - switch (c) { - case '"': - // remove character by not moving cursor - if (chars == null) { - chars = str.toCharArray(); - } - break; - - case ' ': - if (chars == null) { - chars = str.toCharArray(); - } - chars[pos++] = '_'; - break; - - case ',': - case '=': - case ';': - case ':': - case '?': - case '\'': - case '*': - if (chars == null) { - chars = str.toCharArray(); - } - chars[pos++] = '-'; - break; - - default: - if (chars != null) { - chars[pos] = c; - } - pos++; - } - } - - return chars == null ? str : new String(chars, 0, pos); - } - - // ------------------------------------------------------------------------ - // Interfaces and base classes for JMX beans - // ------------------------------------------------------------------------ - - public interface MetricMBean {} - - private abstract static class AbstractBean implements MetricMBean {} - - public interface JmxCounterMBean extends MetricMBean { - long getCount(); - } - - private static class JmxCounter extends AbstractBean implements JmxCounterMBean { - private Counter counter; - - JmxCounter(Counter counter) { - this.counter = counter; - } - - @Override - public long getCount() { - return counter.getCount(); - } - } - - public interface JmxGaugeMBean extends MetricMBean { - Object getValue(); - } - - private static class JmxGauge extends AbstractBean implements JmxGaugeMBean { - - private final Gauge<?> gauge; - - JmxGauge(Gauge<?> gauge) { - this.gauge = gauge; - } - - @Override - public Object getValue() { - return gauge.getValue(); - } - } - - public interface JmxHistogramMBean extends MetricMBean { - long getCount(); - - double getMean(); - - double getStdDev(); - - long getMax(); - - long getMin(); - - double getMedian(); - - double get75thPercentile(); - - double get95thPercentile(); - - double get98thPercentile(); - - double get99thPercentile(); - - double get999thPercentile(); - } - - private static class JmxHistogram extends AbstractBean implements JmxHistogramMBean { - - private final Histogram histogram; - - JmxHistogram(Histogram histogram) { - this.histogram = histogram; - } - - @Override - public long getCount() { - return histogram.getCount(); - } - - @Override - public double getMean() { - return histogram.getStatistics().getMean(); - } - - @Override - public double getStdDev() { - return histogram.getStatistics().getStdDev(); - } - - @Override - public long getMax() { - return histogram.getStatistics().getMax(); - } - - @Override - public long getMin() { - return histogram.getStatistics().getMin(); - } - - @Override - public double getMedian() { - return histogram.getStatistics().getQuantile(0.5); - } - - @Override - public double get75thPercentile() { - return histogram.getStatistics().getQuantile(0.75); - } - - @Override - public double get95thPercentile() { - return histogram.getStatistics().getQuantile(0.95); - } - - @Override - public double get98thPercentile() { - return histogram.getStatistics().getQuantile(0.98); - } - - @Override - public double get99thPercentile() { - return histogram.getStatistics().getQuantile(0.99); - } - - @Override - public double get999thPercentile() { - return histogram.getStatistics().getQuantile(0.999); - } - } - - /** - * JMX Server implementation that JMX clients can connect to. - * - * Heavily based on j256 simplejmx project - * - * https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java - */ - private static class JMXServer { - private Registry rmiRegistry; - private JMXConnectorServer connector; - private int port; - - public void start(int port) throws IOException { - if (rmiRegistry != null && connector != null) { - LOG.debug("JMXServer is already running."); - return; - } - startRmiRegistry(port); - startJmxService(port); - this.port = port; - } - - /** - * Starts an RMI Registry that allows clients to lookup the JMX IP/port. - * - * @param port rmi port to use - * @throws IOException - */ - private void startRmiRegistry(int port) throws IOException { - rmiRegistry = LocateRegistry.createRegistry(port); - } - - /** - * Starts a JMX connector that allows (un)registering MBeans with the MBean server and RMI invocations. - * - * @param port jmx port to use - * @throws IOException - */ - private void startJmxService(int port) throws IOException { - String serviceUrl = "service:jmx:rmi://localhost:" + port + "/jndi/rmi://localhost:" + port + "/jmxrmi"; - JMXServiceURL url; - try { - url = new JMXServiceURL(serviceUrl); - } catch (MalformedURLException e) { - throw new IllegalArgumentException("Malformed service url created " + serviceUrl, e); - } - - connector = JMXConnectorServerFactory.newJMXConnectorServer(url, null, ManagementFactory.getPlatformMBeanServer()); - - connector.start(); - } - - public void stop() throws IOException { - if (connector != null) { - try { - connector.stop(); - } finally { - connector = null; - } - } - if (rmiRegistry != null) { - try { - UnicastRemoteObject.unexportObject(rmiRegistry, true); - } catch (NoSuchObjectException e) { - throw new IOException("Could not un-export our RMI registry", e); - } finally { - rmiRegistry = null; - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java deleted file mode 100644 index 9458246..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.reporter; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.groups.AbstractMetricGroup; - -/** - * Reporters are used to export {@link Metric Metrics} to an external backend. - * - * <p>Reporters are instantiated via reflection and must be public, non-abstract, and have a - * public no-argument constructor. - */ -@PublicEvolving -public interface MetricReporter { - - // ------------------------------------------------------------------------ - // life cycle - // ------------------------------------------------------------------------ - - /** - * Configures this reporter. Since reporters are instantiated generically and hence parameter-less, - * this method is the place where the reporters set their basic fields based on configuration values. - * - * <p>This method is always called first on a newly instantiated reporter. - * - * @param config The configuration with all parameters. - */ - void open(Configuration config); - - /** - * Closes this reporter. Should be used to close channels, streams and release resources. - */ - void close(); - - // ------------------------------------------------------------------------ - // adding / removing metrics - // ------------------------------------------------------------------------ - - /** - * Called when a new {@link Metric} was added. - * - * @param metric the metric that was added - * @param metricName the name of the metric - * @param group the group that contains the metric - */ - void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group); - - /** - * Called when a {@link Metric} was should be removed. - * - * @param metric the metric that should be removed - * @param metricName the name of the metric - * @param group the group that contains the metric - */ - void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group); -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java deleted file mode 100644 index cf1fc52..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.reporter; - -import org.apache.flink.annotation.PublicEvolving; - -/** - * Interface for reporters that actively send out data periodically. - */ -@PublicEvolving -public interface Scheduled { - - /** - * Report the current measurements. This method is called periodically by the - * metrics registry that uses the reoprter. - */ - void report(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java deleted file mode 100644 index 8dec4f6..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.groups.AbstractMetricGroup; -import org.apache.flink.metrics.groups.TaskManagerMetricGroup; -import org.apache.flink.metrics.groups.scope.ScopeFormats; -import org.apache.flink.metrics.reporter.Scheduled; -import org.apache.flink.metrics.util.TestReporter; - -import org.apache.flink.util.TestLogger; -import org.junit.Assert; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class MetricRegistryTest extends TestLogger { - - /** - * Verifies that the reporter class argument is correctly used to instantiate and open the reporter. - */ - @Test - public void testReporterInstantiation() { - Configuration config = new Configuration(); - - config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter1.class.getName()); - - new MetricRegistry(config); - - Assert.assertTrue(TestReporter1.wasOpened); - } - - protected static class TestReporter1 extends TestReporter { - public static boolean wasOpened = false; - - @Override - public void open(Configuration config) { - wasOpened = true; - } - } - - /** - * Verifies that configured arguments are properly forwarded to the reporter. - */ - @Test - public void testReporterArgumentForwarding() { - Configuration config = new Configuration(); - - config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter2.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "--arg1 hello --arg2 world"); - - new MetricRegistry(config); - } - - protected static class TestReporter2 extends TestReporter { - @Override - public void open(Configuration config) { - Assert.assertEquals("hello", config.getString("arg1", null)); - Assert.assertEquals("world", config.getString("arg2", null)); - } - } - - /** - * Verifies that reporters implementing the Scheduled interface are regularly called to report the metrics. - * - * @throws InterruptedException - */ - @Test - public void testReporterScheduling() throws InterruptedException { - Configuration config = new Configuration(); - - config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter3.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_INTERVAL, "50 MILLISECONDS"); - - new MetricRegistry(config); - - long start = System.currentTimeMillis(); - for (int x = 0; x < 10; x++) { - Thread.sleep(100); - int reportCount = TestReporter3.reportCount; - long curT = System.currentTimeMillis(); - /** - * Within a given time-frame T only T/500 reports may be triggered due to the interval between reports. - * This value however does not not take the first triggered report into account (=> +1). - * Furthermore we have to account for the mis-alignment between reports being triggered and our time - * measurement (=> +1); for T=200 a total of 4-6 reports may have been - * triggered depending on whether the end of the interval for the first reports ends before - * or after T=50. - */ - long maxAllowedReports = (curT - start) / 50 + 2; - Assert.assertTrue("Too many report were triggered.", maxAllowedReports >= reportCount); - } - Assert.assertTrue("No report was triggered.", TestReporter3.reportCount > 0); - } - - protected static class TestReporter3 extends TestReporter implements Scheduled { - public static int reportCount = 0; - - @Override - public void report() { - reportCount++; - } - } - - /** - * Verifies that reporters implementing the Listener interface are notified when Metrics are added or removed. - */ - @Test - public void testListener() { - Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter6.class.getName()); - - MetricRegistry registry = new MetricRegistry(config); - - TaskManagerMetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); - root.counter("rootCounter"); - root.close(); - - assertTrue(TestReporter6.addCalled); - assertTrue(TestReporter6.removeCalled); - } - - protected static class TestReporter6 extends TestReporter { - public static boolean addCalled = false; - public static boolean removeCalled = false; - - @Override - public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { - addCalled = true; - assertTrue(metric instanceof Counter); - assertEquals("rootCounter", metricName); - } - - @Override - public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) { - removeCalled = true; - Assert.assertTrue(metric instanceof Counter); - Assert.assertEquals("rootCounter", metricName); - } - } - - /** - * Verifies that the scope configuration is properly extracted. - */ - @Test - public void testScopeConfig() { - Configuration config = new Configuration(); - - config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A"); - config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "B"); - config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "C"); - config.setString(ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, "D"); - - ScopeFormats scopeConfig = MetricRegistry.createScopeConfig(config); - - assertEquals("A", scopeConfig.getTaskManagerFormat().format()); - assertEquals("B", scopeConfig.getTaskManagerJobFormat().format()); - assertEquals("C", scopeConfig.getTaskFormat().format()); - assertEquals("D", scopeConfig.getOperatorFormat().format()); - } - - @Test - public void testConfigurableDelimiter() { - Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_"); - config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D.E"); - - MetricRegistry registry = new MetricRegistry(config); - - TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id"); - assertEquals("A_B_C_D_E_name", tmGroup.getMetricIdentifier("name")); - - registry.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java deleted file mode 100644 index fb0af2e..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics.groups; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat; -import org.junit.Test; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class JobManagerGroupTest { - - // ------------------------------------------------------------------------ - // adding and removing jobs - // ------------------------------------------------------------------------ - - @Test - public void addAndRemoveJobs() { - final JobManagerMetricGroup group = new JobManagerMetricGroup( - new MetricRegistry(new Configuration()), "localhost"); - - final JobID jid1 = new JobID(); - final JobID jid2 = new JobID(); - - final String jobName1 = "testjob"; - final String jobName2 = "anotherJob"; - - JobManagerJobMetricGroup jmJobGroup11 = group.addJob(jid1, jobName1); - JobManagerJobMetricGroup jmJobGroup12 = group.addJob(jid1, jobName1); - JobManagerJobMetricGroup jmJobGroup21 = group.addJob(jid2, jobName2); - - assertEquals(jmJobGroup11, jmJobGroup12); - - assertEquals(2, group.numRegisteredJobMetricGroups()); - - group.removeJob(jid1); - - assertTrue(jmJobGroup11.isClosed()); - assertEquals(1, group.numRegisteredJobMetricGroups()); - - group.removeJob(jid2); - - assertTrue(jmJobGroup21.isClosed()); - assertEquals(0, group.numRegisteredJobMetricGroups()); - } - - @Test - public void testCloseClosesAll() { - final JobManagerMetricGroup group = new JobManagerMetricGroup( - new MetricRegistry(new Configuration()), "localhost"); - - final JobID jid1 = new JobID(); - final JobID jid2 = new JobID(); - - final String jobName1 = "testjob"; - final String jobName2 = "anotherJob"; - - JobManagerJobMetricGroup jmJobGroup11 = group.addJob(jid1, jobName1); - JobManagerJobMetricGroup jmJobGroup21 = group.addJob(jid2, jobName2); - - group.close(); - - assertTrue(jmJobGroup11.isClosed()); - assertTrue(jmJobGroup21.isClosed()); - } - - // ------------------------------------------------------------------------ - // scope name tests - // ------------------------------------------------------------------------ - - @Test - public void testGenerateScopeDefault() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); - - assertArrayEquals(new String[]{"localhost", "jobmanager"}, group.getScopeComponents()); - assertEquals("localhost.jobmanager.name", group.getMetricIdentifier("name")); - } - - @Test - public void testGenerateScopeCustom() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - JobManagerScopeFormat format = new JobManagerScopeFormat("constant.<host>.foo.<host>"); - JobManagerMetricGroup group = new JobManagerMetricGroup(registry, format, "host"); - - assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents()); - assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name")); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java deleted file mode 100644 index dd2829c..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat; -import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat; -import org.junit.Test; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -public class JobManagerJobGroupTest { - - @Test - public void testGenerateScopeDefault() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - - JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName"); - JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); - - assertArrayEquals( - new String[] { "theHostName", "jobmanager", "myJobName"}, - jmGroup.getScopeComponents()); - - assertEquals( - "theHostName.jobmanager.myJobName.name", - jmGroup.getMetricIdentifier("name")); - } - - @Test - public void testGenerateScopeCustom() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - - JobManagerScopeFormat tmFormat = new JobManagerScopeFormat("abc"); - JobManagerJobScopeFormat jmFormat = new JobManagerJobScopeFormat("some-constant.<job_name>", tmFormat); - - JobID jid = new JobID(); - - JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName"); - JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); - - assertArrayEquals( - new String[] { "some-constant", "myJobName" }, - jmGroup.getScopeComponents()); - - assertEquals( - "some-constant.myJobName.name", - jmGroup.getMetricIdentifier("name")); - } - - @Test - public void testGenerateScopeCustomWildcard() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - - JobManagerScopeFormat tmFormat = new JobManagerScopeFormat("peter"); - JobManagerJobScopeFormat jmFormat = new JobManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat); - - JobID jid = new JobID(); - - JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, tmFormat, "theHostName"); - JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); - - assertArrayEquals( - new String[] { "peter", "some-constant", jid.toString() }, - jmGroup.getScopeComponents()); - - assertEquals( - "peter.some-constant." + jid + ".name", - jmGroup.getMetricIdentifier("name")); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java deleted file mode 100644 index 2a737df..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics.groups; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Histogram; -import org.apache.flink.metrics.HistogramStatistics; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.util.TestReporter; - -import org.junit.Assert; -import org.junit.Test; - -import static org.junit.Assert.*; - -public class MetricGroupRegistrationTest { - /** - * Verifies that group methods instantiate the correct metric with the given name. - */ - @Test - public void testMetricInstantiation() { - Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter1.class.getName()); - - MetricRegistry registry = new MetricRegistry(config); - - MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); - - Counter counter = root.counter("counter"); - assertEquals(counter, TestReporter1.lastPassedMetric); - assertEquals("counter", TestReporter1.lastPassedName); - - Gauge<Object> gauge = root.gauge("gauge", new Gauge<Object>() { - @Override - public Object getValue() { - return null; - } - }); - - Assert.assertEquals(gauge, TestReporter1.lastPassedMetric); - assertEquals("gauge", TestReporter1.lastPassedName); - - Histogram histogram = root.histogram("histogram", new Histogram() { - @Override - public void update(long value) { - - } - - @Override - public long getCount() { - return 0; - } - - @Override - public HistogramStatistics getStatistics() { - return null; - } - }); - - Assert.assertEquals(histogram, TestReporter1.lastPassedMetric); - assertEquals("histogram", TestReporter1.lastPassedName); - registry.shutdown(); - } - - public static class TestReporter1 extends TestReporter { - - public static Metric lastPassedMetric; - public static String lastPassedName; - - @Override - public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { - lastPassedMetric = metric; - lastPassedName = metricName; - } - } - - /** - * Verifies that when attempting to create a group with the name of an existing one the existing one will be returned instead. - */ - @Test - public void testDuplicateGroupName() { - Configuration config = new Configuration(); - - MetricRegistry registry = new MetricRegistry(config); - - MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); - - MetricGroup group1 = root.addGroup("group"); - MetricGroup group2 = root.addGroup("group"); - MetricGroup group3 = root.addGroup("group"); - Assert.assertTrue(group1 == group2 && group2 == group3); - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java deleted file mode 100644 index 227db33..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.MetricRegistry; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.*; - -public class MetricGroupTest { - - private MetricRegistry registry; - - private final MetricRegistry exceptionOnRegister = new ExceptionOnRegisterRegistry(); - - @Before - public void createRegistry() { - this.registry = new MetricRegistry(new Configuration()); - } - - @After - public void shutdownRegistry() { - this.registry.shutdown(); - this.registry = null; - } - - @Test - public void sameGroupOnNameCollision() { - GenericMetricGroup group = new GenericMetricGroup( - registry, new DummyAbstractMetricGroup(registry), "somegroup"); - - String groupName = "sometestname"; - MetricGroup subgroup1 = group.addGroup(groupName); - MetricGroup subgroup2 = group.addGroup(groupName); - - assertNotNull(subgroup1); - assertNotNull(subgroup2); - assertTrue(subgroup1 == subgroup2); - } - - @Test - public void closedGroupDoesNotRegisterMetrics() { - GenericMetricGroup group = new GenericMetricGroup( - exceptionOnRegister, new DummyAbstractMetricGroup(exceptionOnRegister), "testgroup"); - assertFalse(group.isClosed()); - - group.close(); - assertTrue(group.isClosed()); - - // these will fail is the registration is propagated - group.counter("testcounter"); - group.gauge("testgauge", new Gauge<Object>() { - @Override - public Object getValue() { return null; } - }); - } - - @Test - public void closedGroupCreatesClosedGroups() { - GenericMetricGroup group = new GenericMetricGroup(exceptionOnRegister, - new DummyAbstractMetricGroup(exceptionOnRegister), "testgroup"); - assertFalse(group.isClosed()); - - group.close(); - assertTrue(group.isClosed()); - - AbstractMetricGroup subgroup = (AbstractMetricGroup) group.addGroup("test subgroup"); - assertTrue(subgroup.isClosed()); - } - - @Test - public void tolerateMetricNameCollisions() { - final String name = "abctestname"; - GenericMetricGroup group = new GenericMetricGroup( - registry, new DummyAbstractMetricGroup(registry), "testgroup"); - - assertNotNull(group.counter(name)); - assertNotNull(group.counter(name)); - } - - @Test - public void tolerateMetricAndGroupNameCollisions() { - final String name = "abctestname"; - GenericMetricGroup group = new GenericMetricGroup( - registry, new DummyAbstractMetricGroup(registry), "testgroup"); - - assertNotNull(group.addGroup(name)); - assertNotNull(group.counter(name)); - } - - // ------------------------------------------------------------------------ - - private static class ExceptionOnRegisterRegistry extends MetricRegistry { - - public ExceptionOnRegisterRegistry() { - super(new Configuration()); - } - - @Override - public void register(Metric metric, String name, AbstractMetricGroup parent) { - fail("Metric should never be registered"); - } - - @Override - public void unregister(Metric metric, String name, AbstractMetricGroup parent) { - fail("Metric should never be un-registered"); - } - } - - // ------------------------------------------------------------------------ - - private static class DummyAbstractMetricGroup extends AbstractMetricGroup { - - public DummyAbstractMetricGroup(MetricRegistry registry) { - super(registry, new String[0]); - } - - @Override - protected void addMetric(String name, Metric metric) {} - - @Override - public MetricGroup addGroup(String name) { - return new DummyAbstractMetricGroup(registry); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java deleted file mode 100644 index 7ec3d58..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.util.AbstractID; - -import org.junit.Test; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -public class OperatorGroupTest { - - @Test - public void testGenerateScopeDefault() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - - TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); - TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); - TaskMetricGroup taskGroup = new TaskMetricGroup( - registry, jmGroup, new AbstractID(), new AbstractID(), "aTaskName", 11, 0); - OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, taskGroup, "myOpName"); - - assertArrayEquals( - new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName", "myOpName", "11" }, - opGroup.getScopeComponents()); - - assertEquals( - "theHostName.taskmanager.test-tm-id.myJobName.myOpName.11.name", - opGroup.getMetricIdentifier("name")); - - registry.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java deleted file mode 100644 index efaa433..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat; -import org.apache.flink.util.AbstractID; - -import org.junit.Test; - -import static org.junit.Assert.*; - -public class TaskManagerGroupTest { - - // ------------------------------------------------------------------------ - // adding and removing jobs - // ------------------------------------------------------------------------ - - @Test - public void addAndRemoveJobs() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - - final TaskManagerMetricGroup group = new TaskManagerMetricGroup( - registry, "localhost", new AbstractID().toString()); - - - final JobID jid1 = new JobID(); - final JobID jid2 = new JobID(); - - final String jobName1 = "testjob"; - final String jobName2 = "anotherJob"; - - final AbstractID vertex11 = new AbstractID(); - final AbstractID vertex12 = new AbstractID(); - final AbstractID vertex13 = new AbstractID(); - final AbstractID vertex21 = new AbstractID(); - - final AbstractID execution11 = new AbstractID(); - final AbstractID execution12 = new AbstractID(); - final AbstractID execution13 = new AbstractID(); - final AbstractID execution21 = new AbstractID(); - - TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, vertex11, execution11, "test", 17, 0); - TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, vertex12, execution12, "test", 13, 1); - TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, vertex21, execution21, "test", 7, 2); - - assertEquals(2, group.numRegisteredJobMetricGroups()); - assertFalse(tmGroup11.parent().isClosed()); - assertFalse(tmGroup12.parent().isClosed()); - assertFalse(tmGroup21.parent().isClosed()); - - // close all for job 2 and one from job 1 - tmGroup11.close(); - tmGroup21.close(); - assertTrue(tmGroup11.isClosed()); - assertTrue(tmGroup21.isClosed()); - - // job 2 should be removed, job should still be there - assertFalse(tmGroup11.parent().isClosed()); - assertFalse(tmGroup12.parent().isClosed()); - assertTrue(tmGroup21.parent().isClosed()); - assertEquals(1, group.numRegisteredJobMetricGroups()); - - // add one more to job one - TaskMetricGroup tmGroup13 = group.addTaskForJob(jid1, jobName1, vertex13, execution13, "test", 0, 0); - tmGroup12.close(); - tmGroup13.close(); - - assertTrue(tmGroup11.parent().isClosed()); - assertTrue(tmGroup12.parent().isClosed()); - assertTrue(tmGroup13.parent().isClosed()); - - assertEquals(0, group.numRegisteredJobMetricGroups()); - - registry.shutdown(); - } - - @Test - public void testCloseClosesAll() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - final TaskManagerMetricGroup group = new TaskManagerMetricGroup( - registry, "localhost", new AbstractID().toString()); - - - final JobID jid1 = new JobID(); - final JobID jid2 = new JobID(); - - final String jobName1 = "testjob"; - final String jobName2 = "anotherJob"; - - final AbstractID vertex11 = new AbstractID(); - final AbstractID vertex12 = new AbstractID(); - final AbstractID vertex21 = new AbstractID(); - - final AbstractID execution11 = new AbstractID(); - final AbstractID execution12 = new AbstractID(); - final AbstractID execution21 = new AbstractID(); - - TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, vertex11, execution11, "test", 17, 1); - TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, vertex12, execution12, "test", 13, 2); - TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, vertex21, execution21, "test", 7, 1); - - group.close(); - - assertTrue(tmGroup11.isClosed()); - assertTrue(tmGroup12.isClosed()); - assertTrue(tmGroup21.isClosed()); - - registry.shutdown(); - } - - // ------------------------------------------------------------------------ - // scope name tests - // ------------------------------------------------------------------------ - - @Test - public void testGenerateScopeDefault() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "localhost", "id"); - - assertArrayEquals(new String[] { "localhost", "taskmanager", "id" }, group.getScopeComponents()); - assertEquals("localhost.taskmanager.id.name", group.getMetricIdentifier("name")); - registry.shutdown(); - } - - @Test - public void testGenerateScopeCustom() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - TaskManagerScopeFormat format = new TaskManagerScopeFormat("constant.<host>.foo.<host>"); - TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, format, "host", "id"); - - assertArrayEquals(new String[] { "constant", "host", "foo", "host" }, group.getScopeComponents()); - assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name")); - registry.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java deleted file mode 100644 index 117d5bb..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat; -import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat; - -import org.junit.Test; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -public class TaskManagerJobGroupTest { - - @Test - public void testGenerateScopeDefault() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - - TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); - JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); - - assertArrayEquals( - new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName"}, - jmGroup.getScopeComponents()); - - assertEquals( - "theHostName.taskmanager.test-tm-id.myJobName.name", - jmGroup.getMetricIdentifier("name")); - registry.shutdown(); - } - - @Test - public void testGenerateScopeCustom() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - - TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("abc"); - TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("some-constant.<job_name>", tmFormat); - - JobID jid = new JobID(); - - TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); - JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); - - assertArrayEquals( - new String[] { "some-constant", "myJobName" }, - jmGroup.getScopeComponents()); - - assertEquals( - "some-constant.myJobName.name", - jmGroup.getMetricIdentifier("name")); - registry.shutdown(); - } - - @Test - public void testGenerateScopeCustomWildcard() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - - TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("peter.<tm_id>"); - TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat); - - JobID jid = new JobID(); - - TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, tmFormat, "theHostName", "test-tm-id"); - JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); - - assertArrayEquals( - new String[] { "peter", "test-tm-id", "some-constant", jid.toString() }, - jmGroup.getScopeComponents()); - - assertEquals( - "peter.test-tm-id.some-constant." + jid + ".name", - jmGroup.getMetricIdentifier("name")); - registry.shutdown(); - } -}