[FLINK-9187][metrics] Add Prometheus PushGateway reporter

This closes #6184.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ee5dbf3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ee5dbf3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ee5dbf3

Branch: refs/heads/master
Commit: 5ee5dbf3dd9e5240ed13c8c3eaff6cca158010b3
Parents: 60df251
Author: lamber-ken <!@#123zxcQ>
Authored: Wed Jun 20 12:26:10 2018 +0800
Committer: zentol <ches...@apache.org>
Committed: Wed Jul 11 12:05:07 2018 +0200

----------------------------------------------------------------------
 ...eus_push_gateway_reporter_configuration.html |  36 +++
 docs/monitoring/metrics.md                      |  26 ++
 flink-docs/pom.xml                              |   5 +
 .../ConfigOptionsDocGenerator.java              |   1 +
 flink-metrics/flink-metrics-prometheus/pom.xml  |   6 +
 .../prometheus/AbstractPrometheusReporter.java  | 306 +++++++++++++++++++
 .../PrometheusPushGatewayReporter.java          |  91 ++++++
 .../PrometheusPushGatewayReporterOptions.java   |  53 ++++
 .../metrics/prometheus/PrometheusReporter.java  | 271 +---------------
 .../prometheus/PrometheusReporterTest.java      |   8 +-
 10 files changed, 532 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/docs/_includes/generated/prometheus_push_gateway_reporter_configuration.html
----------------------------------------------------------------------
diff --git 
a/docs/_includes/generated/prometheus_push_gateway_reporter_configuration.html 
b/docs/_includes/generated/prometheus_push_gateway_reporter_configuration.html
new file mode 100644
index 0000000..2d14b50
--- /dev/null
+++ 
b/docs/_includes/generated/prometheus_push_gateway_reporter_configuration.html
@@ -0,0 +1,36 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>deleteOnShutdown</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Specifies whether to delete metrics from the PushGateway on 
shutdown.</td>
+        </tr>
+        <tr>
+            <td><h5>host</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The PushGateway server host.</td>
+        </tr>
+        <tr>
+            <td><h5>jobName</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The job name under which metrics will be pushed</td>
+        </tr>
+        <tr>
+            <td><h5>port</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The PushGateway server port.</td>
+        </tr>
+        <tr>
+            <td><h5>randomJobNameSuffix</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Specifies whether a random suffix should be appended to the 
job name.</td>
+        </tr>
+    </tbody>
+</table>

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index a06e7f3..55f626e 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -700,6 +700,32 @@ Flink metric types are mapped to Prometheus metric types 
as follows:
 
 All Flink metrics variables (see [List of all 
Variables](#list-of-all-variables)) are exported to Prometheus as labels. 
 
+### PrometheusPushGateway 
(org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)
+
+In order to use this reporter you must copy 
`/opt/flink-metrics-prometheus-{{site.version}}.jar` into the `/lib` folder
+of your Flink distribution.
+
+Parameters:
+
+{% include generated/prometheus_push_gateway_reporter_configuration.html %}
+
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporter.promgateway.class: 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
+metrics.reporter.promgateway.host: localhost
+metrics.reporter.promgateway.port: 9091
+metrics.reporter.promgateway.jobName: myJob
+metrics.reporter.promgateway.randomJobNameSuffix: true
+metrics.reporter.promgateway.deleteOnShutdown: false
+
+{% endhighlight %}
+
+The PrometheusPushGatewayReporter pushes metrics to a 
[Pushgateway](https://github.com/prometheus/pushgateway), which can be scraped 
by Prometheus.
+
+Please see the [Prometheus 
documentation](https://prometheus.io/docs/practices/pushing/) for use-cases.
+
 ### StatsD (org.apache.flink.metrics.statsd.StatsDReporter)
 
 In order to use this reporter you must copy 
`/opt/flink-metrics-statsd-{{site.version}}.jar` into the `/lib` folder

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/flink-docs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml
index dfe33be..2135dea 100644
--- a/flink-docs/pom.xml
+++ b/flink-docs/pom.xml
@@ -54,6 +54,11 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
                <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-prometheus</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
                        <!-- necessary for loading the web-submission extension 
-->
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
 
b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index aca722e..743f49f 100644
--- 
a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ 
b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -56,6 +56,7 @@ public class ConfigOptionsDocGenerator {
                new OptionsClassLocation("flink-yarn", 
"org.apache.flink.yarn.configuration"),
                new OptionsClassLocation("flink-mesos", 
"org.apache.flink.mesos.configuration"),
                new OptionsClassLocation("flink-mesos", 
"org.apache.flink.mesos.runtime.clusterframework"),
+               new 
OptionsClassLocation("flink-metrics/flink-metrics-prometheus", 
"org.apache.flink.metrics.prometheus"),
        };
 
        static final String DEFAULT_PATH_PREFIX = "src/main/java";

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/flink-metrics/flink-metrics-prometheus/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/pom.xml 
b/flink-metrics/flink-metrics-prometheus/pom.xml
index b0cad84..9aad69c 100644
--- a/flink-metrics/flink-metrics-prometheus/pom.xml
+++ b/flink-metrics/flink-metrics-prometheus/pom.xml
@@ -73,6 +73,12 @@ under the License.
                        <version>${prometheus.version}</version>
                </dependency>
 
+               <dependency>
+                       <groupId>io.prometheus</groupId>
+                       <artifactId>simpleclient_pushgateway</artifactId>
+                       <version>${prometheus.version}</version>
+               </dependency>
+
                <!-- test dependencies -->
 
                <dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
new file mode 100644
index 0000000..426cd4c
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+
+/**
+ * base prometheus reporter for prometheus metrics.
+ */
+@PublicEvolving
+public abstract class AbstractPrometheusReporter implements MetricReporter {
+
+       protected final Logger log = LoggerFactory.getLogger(getClass());
+
+       private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
+       private static final CharacterFilter CHARACTER_FILTER = new 
CharacterFilter() {
+               @Override
+               public String filterCharacters(String input) {
+                       return replaceInvalidChars(input);
+               }
+       };
+
+       private static final char SCOPE_SEPARATOR = '_';
+       private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR;
+
+       private final Map<String, AbstractMap.SimpleImmutableEntry<Collector, 
Integer>> collectorsWithCountByMetricName = new HashMap<>();
+
+       @VisibleForTesting
+       static String replaceInvalidChars(final String input) {
+               // https://prometheus.io/docs/instrumenting/writing_exporters/
+               // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
+               return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+       }
+
+       @Override
+       public void close() {
+               CollectorRegistry.defaultRegistry.clear();
+       }
+
+       @Override
+       public void notifyOfAddedMetric(final Metric metric, final String 
metricName, final MetricGroup group) {
+
+               List<String> dimensionKeys = new LinkedList<>();
+               List<String> dimensionValues = new LinkedList<>();
+               for (final Map.Entry<String, String> dimension : 
group.getAllVariables().entrySet()) {
+                       final String key = dimension.getKey();
+                       
dimensionKeys.add(CHARACTER_FILTER.filterCharacters(key.substring(1, 
key.length() - 1)));
+                       
dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
+               }
+
+               final String scopedMetricName = getScopedName(metricName, 
group);
+               final String helpString = metricName + " (scope: " + 
getLogicalScope(group) + ")";
+
+               final Collector collector;
+               Integer count = 0;
+
+               synchronized (this) {
+                       if 
(collectorsWithCountByMetricName.containsKey(scopedMetricName)) {
+                               final 
AbstractMap.SimpleImmutableEntry<Collector, Integer> collectorWithCount = 
collectorsWithCountByMetricName.get(scopedMetricName);
+                               collector = collectorWithCount.getKey();
+                               count = collectorWithCount.getValue();
+                       } else {
+                               collector = createCollector(metric, 
dimensionKeys, dimensionValues, scopedMetricName, helpString);
+                               try {
+                                       collector.register();
+                               } catch (Exception e) {
+                                       log.warn("There was a problem 
registering metric {}.", metricName, e);
+                               }
+                       }
+                       addMetric(metric, dimensionValues, collector);
+                       collectorsWithCountByMetricName.put(scopedMetricName, 
new AbstractMap.SimpleImmutableEntry<>(collector, count + 1));
+               }
+       }
+
+       private static String getScopedName(String metricName, MetricGroup 
group) {
+               return SCOPE_PREFIX + getLogicalScope(group) + SCOPE_SEPARATOR 
+ CHARACTER_FILTER.filterCharacters(metricName);
+       }
+
+       private Collector createCollector(Metric metric, List<String> 
dimensionKeys, List<String> dimensionValues, String scopedMetricName, String 
helpString) {
+               Collector collector;
+               if (metric instanceof Gauge || metric instanceof Counter || 
metric instanceof Meter) {
+                       collector = io.prometheus.client.Gauge
+                               .build()
+                               .name(scopedMetricName)
+                               .help(helpString)
+                               .labelNames(toArray(dimensionKeys))
+                               .create();
+               } else if (metric instanceof Histogram) {
+                       collector = new HistogramSummaryProxy((Histogram) 
metric, scopedMetricName, helpString, dimensionKeys, dimensionValues);
+               } else {
+                       log.warn("Cannot create collector for unknown metric 
type: {}. This indicates that the metric type is not supported by this 
reporter.",
+                               metric.getClass().getName());
+                       collector = null;
+               }
+               return collector;
+       }
+
+       private void addMetric(Metric metric, List<String> dimensionValues, 
Collector collector) {
+               if (metric instanceof Gauge) {
+                       ((io.prometheus.client.Gauge) 
collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues));
+               } else if (metric instanceof Counter) {
+                       ((io.prometheus.client.Gauge) 
collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues));
+               } else if (metric instanceof Meter) {
+                       ((io.prometheus.client.Gauge) 
collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues));
+               } else if (metric instanceof Histogram) {
+                       ((HistogramSummaryProxy) 
collector).addChild((Histogram) metric, dimensionValues);
+               } else {
+                       log.warn("Cannot add unknown metric type: {}. This 
indicates that the metric type is not supported by this reporter.",
+                               metric.getClass().getName());
+               }
+       }
+
+       private void removeMetric(Metric metric, List<String> dimensionValues, 
Collector collector) {
+               if (metric instanceof Gauge) {
+                       ((io.prometheus.client.Gauge) 
collector).remove(toArray(dimensionValues));
+               } else if (metric instanceof Counter) {
+                       ((io.prometheus.client.Gauge) 
collector).remove(toArray(dimensionValues));
+               } else if (metric instanceof Meter) {
+                       ((io.prometheus.client.Gauge) 
collector).remove(toArray(dimensionValues));
+               } else if (metric instanceof Histogram) {
+                       ((HistogramSummaryProxy) 
collector).remove(dimensionValues);
+               } else {
+                       log.warn("Cannot remove unknown metric type: {}. This 
indicates that the metric type is not supported by this reporter.",
+                               metric.getClass().getName());
+               }
+       }
+
+       @Override
+       public void notifyOfRemovedMetric(final Metric metric, final String 
metricName, final MetricGroup group) {
+
+               List<String> dimensionValues = new LinkedList<>();
+               for (final Map.Entry<String, String> dimension : 
group.getAllVariables().entrySet()) {
+                       
dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
+               }
+
+               final String scopedMetricName = getScopedName(metricName, 
group);
+               synchronized (this) {
+                       final AbstractMap.SimpleImmutableEntry<Collector, 
Integer> collectorWithCount = 
collectorsWithCountByMetricName.get(scopedMetricName);
+                       final Integer count = collectorWithCount.getValue();
+                       final Collector collector = collectorWithCount.getKey();
+
+                       removeMetric(metric, dimensionValues, collector);
+
+                       if (count == 1) {
+                               try {
+                                       
CollectorRegistry.defaultRegistry.unregister(collector);
+                               } catch (Exception e) {
+                                       log.warn("There was a problem 
unregistering metric {}.", scopedMetricName, e);
+                               }
+                               
collectorsWithCountByMetricName.remove(scopedMetricName);
+                       } else {
+                               
collectorsWithCountByMetricName.put(scopedMetricName, new 
AbstractMap.SimpleImmutableEntry<>(collector, count - 1));
+                       }
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private static String getLogicalScope(MetricGroup group) {
+               return ((FrontMetricGroup<AbstractMetricGroup<?>>) 
group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+       }
+
+       @VisibleForTesting
+       io.prometheus.client.Gauge.Child gaugeFrom(Gauge gauge) {
+               return new io.prometheus.client.Gauge.Child() {
+                       @Override
+                       public double get() {
+                               final Object value = gauge.getValue();
+                               if (value == null) {
+                                       log.debug("Gauge {} is null-valued, 
defaulting to 0.", gauge);
+                                       return 0;
+                               }
+                               if (value instanceof Double) {
+                                       return (double) value;
+                               }
+                               if (value instanceof Number) {
+                                       return ((Number) value).doubleValue();
+                               }
+                               if (value instanceof Boolean) {
+                                       return ((Boolean) value) ? 1 : 0;
+                               }
+                               log.debug("Invalid type for Gauge {}: {}, only 
number types and booleans are supported by this reporter.",
+                                       gauge, value.getClass().getName());
+                               return 0;
+                       }
+               };
+       }
+
+       private static io.prometheus.client.Gauge.Child gaugeFrom(Counter 
counter) {
+               return new io.prometheus.client.Gauge.Child() {
+                       @Override
+                       public double get() {
+                               return (double) counter.getCount();
+                       }
+               };
+       }
+
+       private static io.prometheus.client.Gauge.Child gaugeFrom(Meter meter) {
+               return new io.prometheus.client.Gauge.Child() {
+                       @Override
+                       public double get() {
+                               return meter.getRate();
+                       }
+               };
+       }
+
+       @VisibleForTesting
+       static class HistogramSummaryProxy extends Collector {
+               static final List<Double> QUANTILES = Arrays.asList(.5, .75, 
.95, .98, .99, .999);
+
+               private final String metricName;
+               private final String helpString;
+               private final List<String> labelNamesWithQuantile;
+
+               private final Map<List<String>, Histogram> 
histogramsByLabelValues = new HashMap<>();
+
+               HistogramSummaryProxy(final Histogram histogram, final String 
metricName, final String helpString, final List<String> labelNames, final 
List<String> labelValues) {
+                       this.metricName = metricName;
+                       this.helpString = helpString;
+                       this.labelNamesWithQuantile = addToList(labelNames, 
"quantile");
+                       histogramsByLabelValues.put(labelValues, histogram);
+               }
+
+               @Override
+               public List<MetricFamilySamples> collect() {
+                       // We cannot use SummaryMetricFamily because it is 
impossible to get a sum of all values (at least for Dropwizard histograms,
+                       // whose snapshot's values array only holds a sample of 
recent values).
+
+                       List<MetricFamilySamples.Sample> samples = new 
LinkedList<>();
+                       for (Map.Entry<List<String>, Histogram> 
labelValuesToHistogram : histogramsByLabelValues.entrySet()) {
+                               addSamples(labelValuesToHistogram.getKey(), 
labelValuesToHistogram.getValue(), samples);
+                       }
+                       return Collections.singletonList(new 
MetricFamilySamples(metricName, Type.SUMMARY, helpString, samples));
+               }
+
+               void addChild(final Histogram histogram, final List<String> 
labelValues) {
+                       histogramsByLabelValues.put(labelValues, histogram);
+               }
+
+               void remove(final List<String> labelValues) {
+                       histogramsByLabelValues.remove(labelValues);
+               }
+
+               private void addSamples(final List<String> labelValues, final 
Histogram histogram, final List<MetricFamilySamples.Sample> samples) {
+                       samples.add(new MetricFamilySamples.Sample(metricName + 
"_count",
+                               labelNamesWithQuantile.subList(0, 
labelNamesWithQuantile.size() - 1), labelValues, histogram.getCount()));
+                       for (final Double quantile : QUANTILES) {
+                               samples.add(new 
MetricFamilySamples.Sample(metricName, labelNamesWithQuantile,
+                                       addToList(labelValues, 
quantile.toString()),
+                                       
histogram.getStatistics().getQuantile(quantile)));
+                       }
+               }
+       }
+
+       private static List<String> addToList(List<String> list, String 
element) {
+               final List<String> result = new ArrayList<>(list);
+               result.add(element);
+               return result;
+       }
+
+       private static String[] toArray(List<String> list) {
+               return list.toArray(new String[list.size()]);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
new file mode 100644
index 0000000..164eaf3
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+
+import java.io.IOException;
+
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.DELETE_ON_SHUTDOWN;
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.HOST;
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.JOB_NAME;
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.PORT;
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.RANDOM_JOB_NAME_SUFFIX;
+
+/**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus 
{@link PushGateway}.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter 
implements Scheduled {
+
+       private PushGateway pushGateway;
+       private String jobName;
+       private boolean deleteOnShutdown;
+
+       @Override
+       public void open(MetricConfig config) {
+               String host = config.getString(HOST.key(), HOST.defaultValue());
+               int port = config.getInteger(PORT.key(), PORT.defaultValue());
+               String configuredJobName = config.getString(JOB_NAME.key(), 
JOB_NAME.defaultValue());
+               boolean randomSuffix = 
config.getBoolean(RANDOM_JOB_NAME_SUFFIX.key(), 
RANDOM_JOB_NAME_SUFFIX.defaultValue());
+               deleteOnShutdown = config.getBoolean(DELETE_ON_SHUTDOWN.key(), 
DELETE_ON_SHUTDOWN.defaultValue());
+
+               if (host == null || host.isEmpty() || port < 1) {
+                       throw new IllegalArgumentException("Invalid host/port 
configuration. Host: " + host + " Port: " + port);
+               }
+
+               if (randomSuffix) {
+                       this.jobName = configuredJobName + new AbstractID();
+               } else {
+                       this.jobName = configuredJobName;
+               }
+
+               pushGateway = new PushGateway(host + ':' + port);
+               log.info("Configured PrometheusPushGatewayReporter with 
{host:{}, port:{}, jobName: {}, randomJobNameSuffix:{}, deleteOnShutdown:{}}", 
host, port, jobName, randomSuffix, deleteOnShutdown);
+       }
+
+       @Override
+       public void report() {
+               try {
+                       pushGateway.push(CollectorRegistry.defaultRegistry, 
jobName);
+               } catch (Exception e) {
+                       log.warn("Failed to push metrics to PushGateway with 
jobName {}.", jobName, e);
+               }
+       }
+
+       @Override
+       public void close() {
+               if (deleteOnShutdown && pushGateway != null) {
+                       try {
+                               pushGateway.delete(jobName);
+                       } catch (IOException e) {
+                               log.warn("Failed to delete metrics from 
PushGateway with jobName {}.", jobName, e);
+                       }
+               }
+               super.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java
 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java
new file mode 100644
index 0000000..74fe7cb
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Config options for the {@link PrometheusPushGatewayReporter}.
+ */
+public class PrometheusPushGatewayReporterOptions {
+
+       public static final ConfigOption<String> HOST = ConfigOptions
+               .key("host")
+               .noDefaultValue()
+               .withDescription("The PushGateway server host.");
+
+       public static final ConfigOption<Integer> PORT = ConfigOptions
+               .key("port")
+               .defaultValue(-1)
+               .withDescription("The PushGateway server port.");
+
+       public static final ConfigOption<String> JOB_NAME = ConfigOptions
+               .key("jobName")
+               .defaultValue("")
+               .withDescription("The job name under which metrics will be 
pushed");
+
+       public static final ConfigOption<Boolean> RANDOM_JOB_NAME_SUFFIX = 
ConfigOptions
+               .key("randomJobNameSuffix")
+               .defaultValue(true)
+               .withDescription("Specifies whether a random suffix should be 
appended to the job name.");
+
+       public static final ConfigOption<Boolean> DELETE_ON_SHUTDOWN = 
ConfigOptions
+               .key("deleteOnShutdown")
+               .defaultValue(true)
+               .withDescription("Specifies whether to delete metrics from the 
PushGateway on shutdown.");
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
index ffa419c..190e120 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
@@ -20,62 +20,28 @@ package org.apache.flink.metrics.prometheus;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.metrics.CharacterFilter;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.Preconditions;
 
-import io.prometheus.client.Collector;
-import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.exporter.HTTPServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
 
 /**
  * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus.
  */
 @PublicEvolving
-public class PrometheusReporter implements MetricReporter {
-       private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusReporter.class);
+public class PrometheusReporter extends AbstractPrometheusReporter {
 
        static final String ARG_PORT = "port";
        private static final String DEFAULT_PORT = "9249";
 
-       private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
-       private static final CharacterFilter CHARACTER_FILTER = new 
CharacterFilter() {
-               @Override
-               public String filterCharacters(String input) {
-                       return replaceInvalidChars(input);
-               }
-       };
-
-       private static final char SCOPE_SEPARATOR = '_';
-       private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR;
-
        private HTTPServer httpServer;
        private int port;
-       private final Map<String, AbstractMap.SimpleImmutableEntry<Collector, 
Integer>> collectorsWithCountByMetricName = new HashMap<>();
 
        @VisibleForTesting
        int getPort() {
@@ -83,13 +49,6 @@ public class PrometheusReporter implements MetricReporter {
                return port;
        }
 
-       @VisibleForTesting
-       static String replaceInvalidChars(final String input) {
-               // https://prometheus.io/docs/instrumenting/writing_exporters/
-               // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
-               return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
-       }
-
        @Override
        public void open(MetricConfig config) {
                String portsConfig = config.getString(ARG_PORT, DEFAULT_PORT);
@@ -101,10 +60,10 @@ public class PrometheusReporter implements MetricReporter {
                                // internally accesses 
CollectorRegistry.defaultRegistry
                                httpServer = new HTTPServer(port);
                                this.port = port;
-                               LOG.info("Started PrometheusReporter HTTP 
server on port {}.", port);
+                               log.info("Started PrometheusReporter HTTP 
server on port {}.", port);
                                break;
                        } catch (IOException ioe) { //assume port conflict
-                               LOG.debug("Could not start PrometheusReporter 
HTTP server on port {}.", port, ioe);
+                               log.debug("Could not start PrometheusReporter 
HTTP server on port {}.", port, ioe);
                        }
                }
                if (httpServer == null) {
@@ -117,230 +76,8 @@ public class PrometheusReporter implements MetricReporter {
                if (httpServer != null) {
                        httpServer.stop();
                }
-               CollectorRegistry.defaultRegistry.clear();
-       }
-
-       @Override
-       public void notifyOfAddedMetric(final Metric metric, final String 
metricName, final MetricGroup group) {
-
-               List<String> dimensionKeys = new LinkedList<>();
-               List<String> dimensionValues = new LinkedList<>();
-               for (final Map.Entry<String, String> dimension : 
group.getAllVariables().entrySet()) {
-                       final String key = dimension.getKey();
-                       
dimensionKeys.add(CHARACTER_FILTER.filterCharacters(key.substring(1, 
key.length() - 1)));
-                       
dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
-               }
-
-               final String scopedMetricName = getScopedName(metricName, 
group);
-               final String helpString = metricName + " (scope: " + 
getLogicalScope(group) + ")";
 
-               final Collector collector;
-               Integer count = 0;
-
-               synchronized (this) {
-                       if 
(collectorsWithCountByMetricName.containsKey(scopedMetricName)) {
-                               final 
AbstractMap.SimpleImmutableEntry<Collector, Integer> collectorWithCount = 
collectorsWithCountByMetricName.get(scopedMetricName);
-                               collector = collectorWithCount.getKey();
-                               count = collectorWithCount.getValue();
-                       } else {
-                               collector = createCollector(metric, 
dimensionKeys, dimensionValues, scopedMetricName, helpString);
-                               try {
-                                       collector.register();
-                               } catch (Exception e) {
-                                       LOG.warn("There was a problem 
registering metric {}.", metricName, e);
-                               }
-                       }
-                       addMetric(metric, dimensionValues, collector);
-                       collectorsWithCountByMetricName.put(scopedMetricName, 
new AbstractMap.SimpleImmutableEntry<>(collector, count + 1));
-               }
-       }
-
-       private static String getScopedName(String metricName, MetricGroup 
group) {
-               return SCOPE_PREFIX + getLogicalScope(group) + SCOPE_SEPARATOR 
+ CHARACTER_FILTER.filterCharacters(metricName);
+               super.close();
        }
 
-       private static Collector createCollector(Metric metric, List<String> 
dimensionKeys, List<String> dimensionValues, String scopedMetricName, String 
helpString) {
-               Collector collector;
-               if (metric instanceof Gauge || metric instanceof Counter || 
metric instanceof Meter) {
-                       collector = io.prometheus.client.Gauge
-                               .build()
-                               .name(scopedMetricName)
-                               .help(helpString)
-                               .labelNames(toArray(dimensionKeys))
-                               .create();
-               } else if (metric instanceof Histogram) {
-                       collector = new HistogramSummaryProxy((Histogram) 
metric, scopedMetricName, helpString, dimensionKeys, dimensionValues);
-               } else {
-                       LOG.warn("Cannot create collector for unknown metric 
type: {}. This indicates that the metric type is not supported by this 
reporter.",
-                               metric.getClass().getName());
-                       collector = null;
-               }
-               return collector;
-       }
-
-       private static void addMetric(Metric metric, List<String> 
dimensionValues, Collector collector) {
-               if (metric instanceof Gauge) {
-                       ((io.prometheus.client.Gauge) 
collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues));
-               } else if (metric instanceof Counter) {
-                       ((io.prometheus.client.Gauge) 
collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues));
-               } else if (metric instanceof Meter) {
-                       ((io.prometheus.client.Gauge) 
collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues));
-               } else if (metric instanceof Histogram) {
-                       ((HistogramSummaryProxy) 
collector).addChild((Histogram) metric, dimensionValues);
-               } else {
-                       LOG.warn("Cannot add unknown metric type: {}. This 
indicates that the metric type is not supported by this reporter.",
-                               metric.getClass().getName());
-               }
-       }
-
-       private static void removeMetric(Metric metric, List<String> 
dimensionValues, Collector collector) {
-               if (metric instanceof Gauge) {
-                       ((io.prometheus.client.Gauge) 
collector).remove(toArray(dimensionValues));
-               } else if (metric instanceof Counter) {
-                       ((io.prometheus.client.Gauge) 
collector).remove(toArray(dimensionValues));
-               } else if (metric instanceof Meter) {
-                       ((io.prometheus.client.Gauge) 
collector).remove(toArray(dimensionValues));
-               } else if (metric instanceof Histogram) {
-                       ((HistogramSummaryProxy) 
collector).remove(dimensionValues);
-               } else {
-                       LOG.warn("Cannot remove unknown metric type: {}. This 
indicates that the metric type is not supported by this reporter.",
-                               metric.getClass().getName());
-               }
-       }
-
-       @Override
-       public void notifyOfRemovedMetric(final Metric metric, final String 
metricName, final MetricGroup group) {
-
-               List<String> dimensionValues = new LinkedList<>();
-               for (final Map.Entry<String, String> dimension : 
group.getAllVariables().entrySet()) {
-                       
dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
-               }
-
-               final String scopedMetricName = getScopedName(metricName, 
group);
-               synchronized (this) {
-                       final AbstractMap.SimpleImmutableEntry<Collector, 
Integer> collectorWithCount = 
collectorsWithCountByMetricName.get(scopedMetricName);
-                       final Integer count = collectorWithCount.getValue();
-                       final Collector collector = collectorWithCount.getKey();
-
-                       removeMetric(metric, dimensionValues, collector);
-
-                       if (count == 1) {
-                               try {
-                                       
CollectorRegistry.defaultRegistry.unregister(collector);
-                               } catch (Exception e) {
-                                       LOG.warn("There was a problem 
unregistering metric {}.", scopedMetricName, e);
-                               }
-                               
collectorsWithCountByMetricName.remove(scopedMetricName);
-                       } else {
-                               
collectorsWithCountByMetricName.put(scopedMetricName, new 
AbstractMap.SimpleImmutableEntry<>(collector, count - 1));
-                       }
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       private static String getLogicalScope(MetricGroup group) {
-               return ((FrontMetricGroup<AbstractMetricGroup<?>>) 
group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
-       }
-
-       @VisibleForTesting
-       static io.prometheus.client.Gauge.Child gaugeFrom(Gauge gauge) {
-               return new io.prometheus.client.Gauge.Child() {
-                       @Override
-                       public double get() {
-                               final Object value = gauge.getValue();
-                               if (value == null) {
-                                       LOG.debug("Gauge {} is null-valued, 
defaulting to 0.", gauge);
-                                       return 0;
-                               }
-                               if (value instanceof Double) {
-                                       return (double) value;
-                               }
-                               if (value instanceof Number) {
-                                       return ((Number) value).doubleValue();
-                               }
-                               if (value instanceof Boolean) {
-                                       return ((Boolean) value) ? 1 : 0;
-                               }
-                               LOG.debug("Invalid type for Gauge {}: {}, only 
number types and booleans are supported by this reporter.",
-                                       gauge, value.getClass().getName());
-                               return 0;
-                       }
-               };
-       }
-
-       private static io.prometheus.client.Gauge.Child gaugeFrom(Counter 
counter) {
-               return new io.prometheus.client.Gauge.Child() {
-                       @Override
-                       public double get() {
-                               return (double) counter.getCount();
-                       }
-               };
-       }
-
-       private static io.prometheus.client.Gauge.Child gaugeFrom(Meter meter) {
-               return new io.prometheus.client.Gauge.Child() {
-                       @Override
-                       public double get() {
-                               return meter.getRate();
-                       }
-               };
-       }
-
-       @VisibleForTesting
-       static class HistogramSummaryProxy extends Collector {
-               static final List<Double> QUANTILES = Arrays.asList(.5, .75, 
.95, .98, .99, .999);
-
-               private final String metricName;
-               private final String helpString;
-               private final List<String> labelNamesWithQuantile;
-
-               private final Map<List<String>, Histogram> 
histogramsByLabelValues = new HashMap<>();
-
-               HistogramSummaryProxy(final Histogram histogram, final String 
metricName, final String helpString, final List<String> labelNames, final 
List<String> labelValues) {
-                       this.metricName = metricName;
-                       this.helpString = helpString;
-                       this.labelNamesWithQuantile = addToList(labelNames, 
"quantile");
-                       histogramsByLabelValues.put(labelValues, histogram);
-               }
-
-               @Override
-               public List<MetricFamilySamples> collect() {
-                       // We cannot use SummaryMetricFamily because it is 
impossible to get a sum of all values (at least for Dropwizard histograms,
-                       // whose snapshot's values array only holds a sample of 
recent values).
-
-                       List<MetricFamilySamples.Sample> samples = new 
LinkedList<>();
-                       for (Map.Entry<List<String>, Histogram> 
labelValuesToHistogram : histogramsByLabelValues.entrySet()) {
-                               addSamples(labelValuesToHistogram.getKey(), 
labelValuesToHistogram.getValue(), samples);
-                       }
-                       return Collections.singletonList(new 
MetricFamilySamples(metricName, Type.SUMMARY, helpString, samples));
-               }
-
-               void addChild(final Histogram histogram, final List<String> 
labelValues) {
-                       histogramsByLabelValues.put(labelValues, histogram);
-               }
-
-               void remove(final List<String> labelValues) {
-                       histogramsByLabelValues.remove(labelValues);
-               }
-
-               private void addSamples(final List<String> labelValues, final 
Histogram histogram, final List<MetricFamilySamples.Sample> samples) {
-                       samples.add(new MetricFamilySamples.Sample(metricName + 
"_count",
-                               labelNamesWithQuantile.subList(0, 
labelNamesWithQuantile.size() - 1), labelValues, histogram.getCount()));
-                       for (final Double quantile : QUANTILES) {
-                               samples.add(new 
MetricFamilySamples.Sample(metricName, labelNamesWithQuantile,
-                                       addToList(labelValues, 
quantile.toString()),
-                                       
histogram.getStatistics().getQuantile(quantile)));
-                       }
-               }
-       }
-
-       private static List<String> addToList(List<String> list, String 
element) {
-               final List<String> result = new ArrayList<>(list);
-               result.add(element);
-               return result;
-       }
-
-       private static String[] toArray(List<String> list) {
-               return list.toArray(new String[list.size()]);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
index 592c246..890227f 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -202,7 +202,7 @@ public class PrometheusReporterTest extends TestLogger {
 
        @Test
        public void doubleGaugeIsConvertedCorrectly() {
-               assertThat(PrometheusReporter.gaugeFrom(new Gauge<Double>() {
+               assertThat(reporter.gaugeFrom(new Gauge<Double>() {
                        @Override
                        public Double getValue() {
                                return 3.14;
@@ -212,7 +212,7 @@ public class PrometheusReporterTest extends TestLogger {
 
        @Test
        public void shortGaugeIsConvertedCorrectly() {
-               assertThat(PrometheusReporter.gaugeFrom(new Gauge<Short>() {
+               assertThat(reporter.gaugeFrom(new Gauge<Short>() {
                        @Override
                        public Short getValue() {
                                return 13;
@@ -222,7 +222,7 @@ public class PrometheusReporterTest extends TestLogger {
 
        @Test
        public void booleanGaugeIsConvertedCorrectly() {
-               assertThat(PrometheusReporter.gaugeFrom(new Gauge<Boolean>() {
+               assertThat(reporter.gaugeFrom(new Gauge<Boolean>() {
                        @Override
                        public Boolean getValue() {
                                return true;
@@ -235,7 +235,7 @@ public class PrometheusReporterTest extends TestLogger {
         */
        @Test
        public void stringGaugeCannotBeConverted() {
-               assertThat(PrometheusReporter.gaugeFrom(new Gauge<String>() {
+               assertThat(reporter.gaugeFrom(new Gauge<String>() {
                        @Override
                        public String getValue() {
                                return "I am not a number";

Reply via email to