[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-07-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3833


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-06-01 Thread mbode
Github user mbode commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r119661118
  
--- Diff: flink-metrics/flink-metrics-prometheus/pom.xml ---
@@ -0,0 +1,129 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   1.3-SNAPSHOT
--- End diff --

Done 😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-06-01 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r119595452
  
--- Diff: flink-metrics/flink-metrics-prometheus/pom.xml ---
@@ -0,0 +1,129 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   1.3-SNAPSHOT
--- End diff --

I  guess that's outdated by now. Sorry :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116377143
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -133,61 +136,58 @@ public void notifyOfRemovedMetric(final Metric 
metric, final String metricName,
collectorsByMetricName.remove(metricName);
}
 
+   @SuppressWarnings("unchecked")
+   private static String getLogicalScope(MetricGroup group) {
+   return ((FrontMetricGroup) 
group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+   }
+
private Collector createGauge(final Gauge gauge, final String name, 
final String identifier, final List labelNames, final List 
labelValues) {
-   return io.prometheus.client.Gauge
-   .build()
-   .name(name)
-   .help(identifier)
-   .labelNames(toArray(labelNames, String.class))
-   .create()
-   .setChild(new io.prometheus.client.Gauge.Child() {
-   @Override
-   public double get() {
-   final Object value = gauge.getValue();
-   if (value instanceof Double) {
-   return (double) value;
-   }
-   if (value instanceof Number) {
-   return ((Number) 
value).doubleValue();
-   } else if (value instanceof Boolean) {
-   return ((Boolean) value) ? 1 : 
0;
-   } else {
-   log.debug("Invalid type for 
Gauge {}: {}, only number types and booleans are supported by this reporter.",
-   gauge, 
value.getClass().getName());
-   return 0;
-   }
+   return newGauge(name, identifier, labelNames, labelValues, new 
io.prometheus.client.Gauge.Child() {
+   @Override
+   public double get() {
+   final Object value = gauge.getValue();
+   if (value instanceof Double) {
+   return (double) value;
+   }
+   if (value instanceof Number) {
+   return ((Number) value).doubleValue();
+   } else if (value instanceof Boolean) {
+   return ((Boolean) value) ? 1 : 0;
+   } else {
+   LOG.debug("Invalid type for Gauge {}: 
{}, only number types and booleans are supported by this reporter.",
+   gauge, 
value.getClass().getName());
+   return 0;
}
-   }, toArray(labelValues, String.class));
+   }
+   });
}
 
private static Collector createGauge(final Counter counter, final 
String name, final String identifier, final List labelNames, final 
List labelValues) {
-   return io.prometheus.client.Gauge
-   .build()
-   .name(name)
-   .help(identifier)
-   .labelNames(toArray(labelNames, String.class))
-   .create()
-   .setChild(new io.prometheus.client.Gauge.Child() {
-   @Override
-   public double get() {
-   return (double) counter.getCount();
-   }
-   }, toArray(labelValues, String.class));
+   return newGauge(name, identifier, labelNames, labelValues, new 
io.prometheus.client.Gauge.Child() {
+   @Override
+   public double get() {
+   return (double) counter.getCount();
+   }
+   });
+   }
+
+   private Collector createGauge(final Meter meter, final String name, 
final String identifier, final List labelNames, final List 
labelValues) {
+   return newGauge(name + "_rate", identifier, labelNames, 
labelValues, new io.prometheus.client.Gauge.Child() {
--- End diff --

Meter metrics should contain "rate" in their name already, so we don't have 
to append "_rate" here.


---
If your project is 

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116376926
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
+
+   static final String ARG_PORT = "port";
+   private static final intDEFAULT_PORT = 9249;
+
+   private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
+   private static final CharacterFilter CHARACTER_FILTER   = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return replaceInvalidChars(input);
+   }
+   };
+
+   private static final char SCOPE_SEPARATOR = '_';
+
+   private PrometheusEndpoint prometheusEndpoint;
+   private Map collectorsByMetricName = new HashMap<>();
+
+   @VisibleForTesting
+   static String replaceInvalidChars(final String input) {
+   // https://prometheus.io/docs/instrumenting/writing_exporters/
+   // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
+   return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+   }
+
+   @Override
+   public void open(MetricConfig config) {
+   int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+   log.info("Using port {}.", port);
+   prometheusEndpoint = new PrometheusEndpoint(port);
+   try {
+   prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, 
true);
+   } catch (IOException e) {
+   log.error("Could not start PrometheusEndpoint on port " 
+ port, e);
+   }
+   }
+
+   @Override
+   public void close() {
+   prometheusEndpoint.stop();
+   CollectorRegistry.defaultRegistry.clear();
+   }
+
+   @Override
+   public void notifyOfAddedMetric(final Metric metric, final String 
metricName, final MetricGroup group) {
+   final String scope = 
((FrontMetricGroup) 
group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+   List dimensionKeys = new 

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-14 Thread mbode
Github user mbode commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116376242
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
+
+   static final String ARG_PORT = "port";
+   private static final intDEFAULT_PORT = 9249;
+
+   private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
+   private static final CharacterFilter CHARACTER_FILTER   = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return replaceInvalidChars(input);
+   }
+   };
+
+   private static final char SCOPE_SEPARATOR = '_';
+
+   private PrometheusEndpoint prometheusEndpoint;
+   private Map collectorsByMetricName = new HashMap<>();
+
+   @VisibleForTesting
+   static String replaceInvalidChars(final String input) {
+   // https://prometheus.io/docs/instrumenting/writing_exporters/
+   // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
+   return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+   }
+
+   @Override
+   public void open(MetricConfig config) {
+   int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+   log.info("Using port {}.", port);
+   prometheusEndpoint = new PrometheusEndpoint(port);
+   try {
+   prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, 
true);
+   } catch (IOException e) {
+   log.error("Could not start PrometheusEndpoint on port " 
+ port, e);
+   }
+   }
+
+   @Override
+   public void close() {
+   prometheusEndpoint.stop();
+   CollectorRegistry.defaultRegistry.clear();
+   }
+
+   @Override
+   public void notifyOfAddedMetric(final Metric metric, final String 
metricName, final MetricGroup group) {
+   final String scope = 
((FrontMetricGroup) 
group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+   List dimensionKeys = new 

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-14 Thread mbode
Github user mbode commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116375209
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
+
+   static final String ARG_PORT = "port";
+   private static final intDEFAULT_PORT = 9249;
+
+   private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
+   private static final CharacterFilter CHARACTER_FILTER   = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return replaceInvalidChars(input);
+   }
+   };
+
+   private static final char SCOPE_SEPARATOR = '_';
+
+   private PrometheusEndpoint prometheusEndpoint;
+   private Map collectorsByMetricName = new HashMap<>();
+
+   @VisibleForTesting
+   static String replaceInvalidChars(final String input) {
+   // https://prometheus.io/docs/instrumenting/writing_exporters/
+   // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
+   return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+   }
+
+   @Override
+   public void open(MetricConfig config) {
+   int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+   log.info("Using port {}.", port);
+   prometheusEndpoint = new PrometheusEndpoint(port);
+   try {
+   prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, 
true);
+   } catch (IOException e) {
+   log.error("Could not start PrometheusEndpoint on port " 
+ port, e);
+   }
+   }
+
+   @Override
+   public void close() {
+   prometheusEndpoint.stop();
--- End diff --

I don't think so, `NanoHTTPD.stop()` seems to catch everything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a 

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116368672
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.mashape.unirest.http.HttpResponse;
+import com.mashape.unirest.http.Unirest;
+import com.mashape.unirest.http.exceptions.UnirestException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.util.TestMeter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+
+import static 
org.apache.flink.metrics.prometheus.PrometheusReporter.ARG_PORT;
+import static 
org.apache.flink.runtime.metrics.scope.ScopeFormat.SCOPE_SEPARATOR;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+public class PrometheusReporterTest extends TestLogger {
+   private static final int NON_DEFAULT_PORT = 9429;
+
+   private static final String HOST_NAME= "hostname";
+   private static final String TASK_MANAGER = "tm";
+
+   private static final String HELP_PREFIX= "# HELP ";
+   private static final String TYPE_PREFIX= "# TYPE ";
+   private static final String DIMENSIONS = "host=\"" + HOST_NAME + 
"\",tm_id=\"" + TASK_MANAGER + "\"";
+   private static final String DEFAULT_LABELS = "{" + DIMENSIONS + ",}";
+   private static final String SCOPE_PREFIX   = "taskmanager_";
+
+   @Rule
+   public ExpectedException thrown = ExpectedException.none();
+
+   private MetricRegistry registry = prepareMetricRegistry();
--- End diff --

could be final.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116368588
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
+
+   static final String ARG_PORT = "port";
+   private static final intDEFAULT_PORT = 9249;
+
+   private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
+   private static final CharacterFilter CHARACTER_FILTER   = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return replaceInvalidChars(input);
+   }
+   };
+
+   private static final char SCOPE_SEPARATOR = '_';
+
+   private PrometheusEndpoint prometheusEndpoint;
+   private Map collectorsByMetricName = new HashMap<>();
+
+   @VisibleForTesting
+   static String replaceInvalidChars(final String input) {
+   // https://prometheus.io/docs/instrumenting/writing_exporters/
+   // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
+   return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+   }
+
+   @Override
+   public void open(MetricConfig config) {
+   int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+   log.info("Using port {}.", port);
+   prometheusEndpoint = new PrometheusEndpoint(port);
+   try {
+   prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, 
true);
+   } catch (IOException e) {
+   log.error("Could not start PrometheusEndpoint on port " 
+ port, e);
+   }
+   }
+
+   @Override
+   public void close() {
+   prometheusEndpoint.stop();
+   CollectorRegistry.defaultRegistry.clear();
+   }
+
+   @Override
+   public void notifyOfAddedMetric(final Metric metric, final String 
metricName, final MetricGroup group) {
+   final String scope = 
((FrontMetricGroup) 
group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+   List dimensionKeys = new 

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116368667
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.mashape.unirest.http.HttpResponse;
+import com.mashape.unirest.http.Unirest;
+import com.mashape.unirest.http.exceptions.UnirestException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.util.TestMeter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+
+import static 
org.apache.flink.metrics.prometheus.PrometheusReporter.ARG_PORT;
+import static 
org.apache.flink.runtime.metrics.scope.ScopeFormat.SCOPE_SEPARATOR;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+public class PrometheusReporterTest extends TestLogger {
+   private static final int NON_DEFAULT_PORT = 9429;
+
+   private static final String HOST_NAME= "hostname";
+   private static final String TASK_MANAGER = "tm";
+
+   private static final String HELP_PREFIX= "# HELP ";
+   private static final String TYPE_PREFIX= "# TYPE ";
+   private static final String DIMENSIONS = "host=\"" + HOST_NAME + 
"\",tm_id=\"" + TASK_MANAGER + "\"";
+   private static final String DEFAULT_LABELS = "{" + DIMENSIONS + ",}";
+   private static final String SCOPE_PREFIX   = "taskmanager_";
+
+   @Rule
+   public ExpectedException thrown = ExpectedException.none();
+
+   private MetricRegistry registry = prepareMetricRegistry();
+
+   private final MetricReporter reporter = registry.getReporters().get(0);
+
+   @Test
+   public void counterIsReportedAsPrometheusGauge() throws 
UnirestException {
+   //Prometheus counters may not decrease
+   Counter testCounter = new SimpleCounter();
+   testCounter.inc(7);
+
+   String counterName = "testCounter";
+   String gaugeName = SCOPE_PREFIX + counterName;
+
+   assertThat(addMetricAndPollResponse(testCounter, counterName),
+   containsString(HELP_PREFIX + gaugeName + " " + 
getFullMetricName(counterName) + "\n" +
+  TYPE_PREFIX + gaugeName + " 
gauge" + "\n" +
+  gaugeName + DEFAULT_LABELS + 
" 7.0"));
+   }
+
+   @Test
+   public void gaugeIsReportedAsPrometheusGauge() throws UnirestException {
+   Gauge testGauge = new Gauge() {
+   @Override
+   public Integer getValue() {
+   return 1;
+   }
+   };
+
+   String gaugeName = "testGauge";
+   String 

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116368532
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
+
+   static final String ARG_PORT = "port";
+   private static final intDEFAULT_PORT = 9249;
+
+   private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
+   private static final CharacterFilter CHARACTER_FILTER   = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return replaceInvalidChars(input);
+   }
+   };
+
+   private static final char SCOPE_SEPARATOR = '_';
+
+   private PrometheusEndpoint prometheusEndpoint;
+   private Map collectorsByMetricName = new HashMap<>();
--- End diff --

Could be final.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116368522
  
--- Diff: flink-metrics/flink-metrics-prometheus/pom.xml ---
@@ -0,0 +1,128 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-metrics-prometheus
+   flink-metrics-prometheus
+
+   
+   
+   org.apache.flink
+   flink-annotations
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   flink-metrics-core
+   ${project.version}
+   provided
+   
+
+   
+   io.prometheus
+   simpleclient
+   ${prometheus.version}
+   
+
+   
+   io.prometheus
+   simpleclient_servlet
+   ${prometheus.version}
+   
+
+   
+   com.nanohttpd
+   nanohttpd
+   2.2.0
+   
+
+   
+   com.google.guava
--- End diff --

We try to avoid guava unless absolutely necessary, and virtually all 
modules that do depend on it shade it away.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116368638
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
+
+   static final String ARG_PORT = "port";
+   private static final intDEFAULT_PORT = 9249;
+
+   private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
+   private static final CharacterFilter CHARACTER_FILTER   = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return replaceInvalidChars(input);
+   }
+   };
+
+   private static final char SCOPE_SEPARATOR = '_';
+
+   private PrometheusEndpoint prometheusEndpoint;
+   private Map collectorsByMetricName = new HashMap<>();
+
+   @VisibleForTesting
+   static String replaceInvalidChars(final String input) {
+   // https://prometheus.io/docs/instrumenting/writing_exporters/
+   // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
+   return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+   }
+
+   @Override
+   public void open(MetricConfig config) {
+   int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+   log.info("Using port {}.", port);
+   prometheusEndpoint = new PrometheusEndpoint(port);
+   try {
+   prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, 
true);
+   } catch (IOException e) {
+   log.error("Could not start PrometheusEndpoint on port " 
+ port, e);
+   }
+   }
+
+   @Override
+   public void close() {
+   prometheusEndpoint.stop();
+   CollectorRegistry.defaultRegistry.clear();
+   }
+
+   @Override
+   public void notifyOfAddedMetric(final Metric metric, final String 
metricName, final MetricGroup group) {
+   final String scope = 
((FrontMetricGroup) 
group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+   List dimensionKeys = new 

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116368696
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
+
+   static final String ARG_PORT = "port";
+   private static final intDEFAULT_PORT = 9249;
+
+   private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
+   private static final CharacterFilter CHARACTER_FILTER   = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return replaceInvalidChars(input);
+   }
+   };
+
+   private static final char SCOPE_SEPARATOR = '_';
+
+   private PrometheusEndpoint prometheusEndpoint;
+   private Map collectorsByMetricName = new HashMap<>();
+
+   @VisibleForTesting
+   static String replaceInvalidChars(final String input) {
+   // https://prometheus.io/docs/instrumenting/writing_exporters/
+   // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
+   return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+   }
+
+   @Override
+   public void open(MetricConfig config) {
+   int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+   log.info("Using port {}.", port);
+   prometheusEndpoint = new PrometheusEndpoint(port);
+   try {
+   prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, 
true);
+   } catch (IOException e) {
+   log.error("Could not start PrometheusEndpoint on port " 
+ port, e);
+   }
+   }
+
+   @Override
+   public void close() {
+   prometheusEndpoint.stop();
+   CollectorRegistry.defaultRegistry.clear();
+   }
+
+   @Override
+   public void notifyOfAddedMetric(final Metric metric, final String 
metricName, final MetricGroup group) {
+   final String scope = 
((FrontMetricGroup) 
group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+   List dimensionKeys = new 

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116368510
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
--- End diff --

by convention this should be upper-case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116368563
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
+
+   static final String ARG_PORT = "port";
+   private static final intDEFAULT_PORT = 9249;
+
+   private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
+   private static final CharacterFilter CHARACTER_FILTER   = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return replaceInvalidChars(input);
+   }
+   };
+
+   private static final char SCOPE_SEPARATOR = '_';
+
+   private PrometheusEndpoint prometheusEndpoint;
+   private Map collectorsByMetricName = new HashMap<>();
+
+   @VisibleForTesting
+   static String replaceInvalidChars(final String input) {
+   // https://prometheus.io/docs/instrumenting/writing_exporters/
+   // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
+   return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+   }
+
+   @Override
+   public void open(MetricConfig config) {
+   int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+   log.info("Using port {}.", port);
+   prometheusEndpoint = new PrometheusEndpoint(port);
+   try {
+   prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, 
true);
+   } catch (IOException e) {
+   log.error("Could not start PrometheusEndpoint on port " 
+ port, e);
+   }
+   }
+
+   @Override
+   public void close() {
+   prometheusEndpoint.stop();
--- End diff --

Can this throw an exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116368530
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
+
+   static final String ARG_PORT = "port";
--- End diff --

The indentation here in inconsistent with the rest of the code base.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116368561
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
+
+   static final String ARG_PORT = "port";
+   private static final intDEFAULT_PORT = 9249;
+
+   private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
+   private static final CharacterFilter CHARACTER_FILTER   = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return replaceInvalidChars(input);
+   }
+   };
+
+   private static final char SCOPE_SEPARATOR = '_';
+
+   private PrometheusEndpoint prometheusEndpoint;
+   private Map collectorsByMetricName = new HashMap<>();
+
+   @VisibleForTesting
+   static String replaceInvalidChars(final String input) {
+   // https://prometheus.io/docs/instrumenting/writing_exporters/
+   // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
+   return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+   }
+
+   @Override
+   public void open(MetricConfig config) {
+   int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+   log.info("Using port {}.", port);
+   prometheusEndpoint = new PrometheusEndpoint(port);
+   try {
+   prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, 
true);
+   } catch (IOException e) {
+   log.error("Could not start PrometheusEndpoint on port " 
+ port, e);
--- End diff --

It would be good to throw an exception here despite the current interface. 
Otherwise the reporter will still be notified of added/removed metrics, even 
though there's no benefit to it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a 

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-06 Thread mbode
GitHub user mbode opened a pull request:

https://github.com/apache/flink/pull/3833

[FLINK-6221] Add PrometheusReporter



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mbode/flink PrometheusReporter

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3833.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3833


commit 9c1889abcd5591d89dde3d5b032b6c54d4d518ba
Author: Maximilian Bode 
Date:   2017-05-06T00:49:42Z

[FLINK-6221] Add PrometheusReporter




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---