Repository: flink Updated Branches: refs/heads/master 90658c838 -> 790a654c5
[FLINK-4143][metrics] Configurable delimiter This closes #2219 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/790a654c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/790a654c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/790a654c Branch: refs/heads/master Commit: 790a654c5e08e0e54f3e02499be4dd8c4006227a Parents: 90658c8 Author: zentol <ches...@apache.org> Authored: Mon Jul 11 12:22:18 2016 +0200 Committer: zentol <ches...@apache.org> Committed: Wed Jul 13 11:43:43 2016 +0200 ---------------------------------------------------------------------- docs/apis/metrics.md | 2 ++ .../flink/configuration/ConfigConstants.java | 3 +++ .../org/apache/flink/metrics/MetricRegistry.java | 15 +++++++++++++++ .../metrics/groups/AbstractMetricGroup.java | 19 +++++++++---------- .../flink/metrics/groups/scope/ScopeFormat.java | 6 +++++- .../flink/metrics/reporter/AbstractReporter.java | 2 +- .../apache/flink/metrics/MetricRegistryTest.java | 17 ++++++++++++++++- .../metrics/groups/JobManagerGroupTest.java | 4 ++-- .../metrics/groups/JobManagerJobGroupTest.java | 12 ++++++------ .../flink/metrics/groups/OperatorGroupTest.java | 4 ++-- .../flink/metrics/groups/TaskGroupTest.java | 12 ++++++------ .../metrics/groups/TaskManagerGroupTest.java | 4 ++-- .../metrics/groups/TaskManagerJobGroupTest.java | 12 ++++++------ .../dropwizard/ScheduledDropwizardReporter.java | 2 +- .../DropwizardFlinkHistogramWrapperTest.java | 2 +- .../flink/metrics/statsd/StatsDReporterTest.java | 2 +- 16 files changed, 78 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/790a654c/docs/apis/metrics.md ---------------------------------------------------------------------- diff --git a/docs/apis/metrics.md b/docs/apis/metrics.md index 3758cc2..329e445 100644 --- a/docs/apis/metrics.md +++ b/docs/apis/metrics.md @@ -162,6 +162,8 @@ public class MyMapper extends RichMapFunction<Long, Integer> { Every metric is assigned an identifier under which it will be reported that is based on 3 components: the user-provided name when registering the metric, an optional user-defined scope and a system-provided scope. For example, if `A.B` is the sytem scope, `C.D` the user scope and `E` the name, then the identifier for the metric will be `A.B.C.D.E`. +You can configure which delimiter to use for the identifier (default: `.`) by setting the `metrics.scope.delimiter` key in `conf/flink-conf.yaml`. + ### User Scope You can define a user scope by calling either `MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`. http://git-wip-us.apache.org/repos/asf/flink/blob/790a654c/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 588423a..2f24cda 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -661,6 +661,9 @@ public final class ConfigConstants { /** The interval between reports. */ public static final String METRICS_REPORTER_INTERVAL = "metrics.reporter.interval"; + /** The delimiter used to assemble the metric identifier. */ + public static final String METRICS_SCOPE_DELIMITER = "metrics.scope.delimiter"; + /** The scope format string that is applied to all metrics scoped to a JobManager. */ public static final String METRICS_SCOPE_NAMING_JM = "metrics.scope.jm"; http://git-wip-us.apache.org/repos/asf/flink/blob/790a654c/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java index f1e44df..a9d7324 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java @@ -49,6 +49,8 @@ public class MetricRegistry { private final ScopeFormats scopeFormats; + private final char delimiter; + /** * Creates a new MetricRegistry and starts the configured reporter. */ @@ -64,6 +66,15 @@ public class MetricRegistry { } this.scopeFormats = scopeFormats; + char delim; + try { + delim = config.getString(ConfigConstants.METRICS_SCOPE_DELIMITER, ".").charAt(0); + } catch (Exception e) { + LOG.warn("Failed to parse delimiter, using default delimiter.", e); + delim = '.'; + } + this.delimiter = delim; + // second, instantiate any custom configured reporters final String className = config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null); @@ -118,6 +129,10 @@ public class MetricRegistry { } } + public char getDelimiter() { + return this.delimiter; + } + private static JMXReporter startJmxReporter(Configuration config) { JMXReporter reporter = null; try { http://git-wip-us.apache.org/repos/asf/flink/blob/790a654c/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java index 112957e..23487f8 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java @@ -87,24 +87,23 @@ public abstract class AbstractMetricGroup implements MetricGroup { /** * Gets the scope as an array of the scope components, for example * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]} - * - * @see #getScopeString() */ public String[] getScopeComponents() { return scopeComponents; } /** - * Gets the scope as a single delimited string, for example - * {@code "host-7.taskmanager-2.window_word_count.my-mapper"} - * - * @see #getScopeComponents() - */ - public String getScopeString() { + * Returns the fully qualified metric name, for example + * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"} + * + * @param metricName metric name + * @return fully qualified metric name + */ + public String getMetricIdentifier(String metricName) { if (scopeString == null) { - scopeString = ScopeFormat.concat(scopeComponents); + scopeString = ScopeFormat.concat(registry.getDelimiter(), scopeComponents); } - return scopeString; + return scopeString + registry.getDelimiter() + metricName; } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/790a654c/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java index b73cf51..9471a07 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java @@ -428,10 +428,14 @@ public abstract class ScopeFormat { } public static String concat(String... components) { + return concat('.', components); + } + + public static String concat(Character delimiter, String... components) { StringBuilder sb = new StringBuilder(); sb.append(components[0]); for (int x = 1; x < components.length; x++) { - sb.append(SCOPE_SEPARATOR); + sb.append(delimiter); sb.append(components[x]); } return sb.toString(); http://git-wip-us.apache.org/repos/asf/flink/blob/790a654c/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java index b377d73..a37b29a 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java @@ -43,7 +43,7 @@ public abstract class AbstractReporter implements MetricReporter { @Override public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { - final String name = replaceInvalidChars(group.getScopeString() + '.' + metricName); + final String name = replaceInvalidChars(group.getMetricIdentifier(metricName)); synchronized (this) { if (metric instanceof Counter) { http://git-wip-us.apache.org/repos/asf/flink/blob/790a654c/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java index 4e0f4d0..f78fb0c 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java @@ -30,7 +30,8 @@ import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class MetricRegistryTest extends TestLogger { @@ -176,4 +177,18 @@ public class MetricRegistryTest extends TestLogger { assertEquals("C", scopeConfig.getTaskFormat().format()); assertEquals("D", scopeConfig.getOperatorFormat().format()); } + + @Test + public void testConfigurableDelimiter() { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_"); + config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D.E"); + + MetricRegistry registry = new MetricRegistry(config); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id"); + assertEquals("A_B_C_D_E_name", tmGroup.getMetricIdentifier("name")); + + registry.shutdown(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/790a654c/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java index 8853f20..fb0af2e 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java @@ -93,7 +93,7 @@ public class JobManagerGroupTest { JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); assertArrayEquals(new String[]{"localhost", "jobmanager"}, group.getScopeComponents()); - assertEquals("localhost.jobmanager", group.getScopeString()); + assertEquals("localhost.jobmanager.name", group.getMetricIdentifier("name")); } @Test @@ -103,6 +103,6 @@ public class JobManagerGroupTest { JobManagerMetricGroup group = new JobManagerMetricGroup(registry, format, "host"); assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents()); - assertEquals("constant.host.foo.host", group.getScopeString()); + assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name")); } } http://git-wip-us.apache.org/repos/asf/flink/blob/790a654c/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java index 3833cb8..dd2829c 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java @@ -42,8 +42,8 @@ public class JobManagerJobGroupTest { jmGroup.getScopeComponents()); assertEquals( - "theHostName.jobmanager.myJobName", - jmGroup.getScopeString()); + "theHostName.jobmanager.myJobName.name", + jmGroup.getMetricIdentifier("name")); } @Test @@ -63,8 +63,8 @@ public class JobManagerJobGroupTest { jmGroup.getScopeComponents()); assertEquals( - "some-constant.myJobName", - jmGroup.getScopeString()); + "some-constant.myJobName.name", + jmGroup.getMetricIdentifier("name")); } @Test @@ -84,7 +84,7 @@ public class JobManagerJobGroupTest { jmGroup.getScopeComponents()); assertEquals( - "peter.some-constant." + jid, - jmGroup.getScopeString()); + "peter.some-constant." + jid + ".name", + jmGroup.getMetricIdentifier("name")); } } http://git-wip-us.apache.org/repos/asf/flink/blob/790a654c/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java index 9641632..7ec3d58 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java @@ -45,8 +45,8 @@ public class OperatorGroupTest { opGroup.getScopeComponents()); assertEquals( - "theHostName.taskmanager.test-tm-id.myJobName.myOpName.11", - opGroup.getScopeString()); + "theHostName.taskmanager.test-tm-id.myJobName.myOpName.11.name", + opGroup.getMetricIdentifier("name")); registry.shutdown(); } http://git-wip-us.apache.org/repos/asf/flink/blob/790a654c/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java index 357852a..5cc6aa1 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java @@ -68,8 +68,8 @@ public class TaskGroupTest { taskGroup.getScopeComponents()); assertEquals( - "theHostName.taskmanager.test-tm-id.myJobName.aTaskName.13", - taskGroup.getScopeString()); + "theHostName.taskmanager.test-tm-id.myJobName.aTaskName.13.name", + taskGroup.getMetricIdentifier("name")); registry.shutdown(); } @@ -95,8 +95,8 @@ public class TaskGroupTest { taskGroup.getScopeComponents()); assertEquals( - String.format("test-tm-id.%s.%s.%s", jid, vertexId, executionId), - taskGroup.getScopeString()); + String.format("test-tm-id.%s.%s.%s.name", jid, vertexId, executionId), + taskGroup.getMetricIdentifier("name")); registry.shutdown(); } @@ -124,8 +124,8 @@ public class TaskGroupTest { taskGroup.getScopeComponents()); assertEquals( - "theHostName.taskmanager.test-tm-id.myJobName." + executionId + ".13", - taskGroup.getScopeString()); + "theHostName.taskmanager.test-tm-id.myJobName." + executionId + ".13.name", + taskGroup.getMetricIdentifier("name")); registry.shutdown(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/790a654c/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java index 9866b1b..efaa433 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java @@ -137,7 +137,7 @@ public class TaskManagerGroupTest { TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "localhost", "id"); assertArrayEquals(new String[] { "localhost", "taskmanager", "id" }, group.getScopeComponents()); - assertEquals("localhost.taskmanager.id", group.getScopeString()); + assertEquals("localhost.taskmanager.id.name", group.getMetricIdentifier("name")); registry.shutdown(); } @@ -148,7 +148,7 @@ public class TaskManagerGroupTest { TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, format, "host", "id"); assertArrayEquals(new String[] { "constant", "host", "foo", "host" }, group.getScopeComponents()); - assertEquals("constant.host.foo.host", group.getScopeString()); + assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name")); registry.shutdown(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/790a654c/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java index 5cec70b..117d5bb 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java @@ -43,8 +43,8 @@ public class TaskManagerJobGroupTest { jmGroup.getScopeComponents()); assertEquals( - "theHostName.taskmanager.test-tm-id.myJobName", - jmGroup.getScopeString()); + "theHostName.taskmanager.test-tm-id.myJobName.name", + jmGroup.getMetricIdentifier("name")); registry.shutdown(); } @@ -65,8 +65,8 @@ public class TaskManagerJobGroupTest { jmGroup.getScopeComponents()); assertEquals( - "some-constant.myJobName", - jmGroup.getScopeString()); + "some-constant.myJobName.name", + jmGroup.getMetricIdentifier("name")); registry.shutdown(); } @@ -87,8 +87,8 @@ public class TaskManagerJobGroupTest { jmGroup.getScopeComponents()); assertEquals( - "peter.test-tm-id.some-constant." + jid, - jmGroup.getScopeString()); + "peter.test-tm-id.some-constant." + jid + ".name", + jmGroup.getMetricIdentifier("name")); registry.shutdown(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/790a654c/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java index 062bbd8..f886130 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java @@ -93,7 +93,7 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch @Override public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { - final String fullName = group.getScopeString() + '.' + metricName; + final String fullName = group.getMetricIdentifier(metricName); synchronized (this) { if (metric instanceof Counter) { http://git-wip-us.apache.org/repos/asf/flink/blob/790a654c/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java index 8ae0186..c1913d7 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java @@ -109,7 +109,7 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger { metricGroup.histogram(histogramMetricName, histogramWrapper); - String fullMetricName = metricGroup.getScopeString() + "." + histogramMetricName; + String fullMetricName = metricGroup.getMetricIdentifier(histogramMetricName); Field f = registry.getClass().getDeclaredField("reporter"); f.setAccessible(true); http://git-wip-us.apache.org/repos/asf/flink/blob/790a654c/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java index 4d29e90..df2725a 100644 --- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java +++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java @@ -94,7 +94,7 @@ public class StatsDReporterTest extends TestLogger { Set<String> lines = receiver.getLines(); - String prefix = metricGroup.getScopeString() + "." + histogramName; + String prefix = metricGroup.getMetricIdentifier(histogramName); Set<String> expectedLines = new HashSet<>();