Repository: flink
Updated Branches:
  refs/heads/master 9487fcbfb -> 19ff8db68


[FLINK-4074] Make metric reporters less blocking

This closes #2105


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56cdec7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56cdec7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56cdec7d

Branch: refs/heads/master
Commit: 56cdec7d81a53f9ea9578ca274c149f9758b5ffd
Parents: 9487fcb
Author: zentol <ches...@apache.org>
Authored: Wed Jun 15 12:23:41 2016 +0200
Committer: zentol <ches...@apache.org>
Committed: Mon Jun 27 13:08:16 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/metrics/MetricRegistry.java    | 39 ++++++++++++--------
 .../flink/metrics/statsd/StatsDReporter.java    |  9 +++++
 2 files changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/56cdec7d/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java 
b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
index b3422e1..0abcdec 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
@@ -31,6 +31,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -60,7 +62,7 @@ public class MetricRegistry {
        static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
        
        private final MetricReporter reporter;
-       private final java.util.Timer timer;
+       private final ScheduledExecutorService executor;
 
        private final ScopeFormats scopeFormats;
 
@@ -86,12 +88,11 @@ public class MetricRegistry {
                        // by default, create JMX metrics
                        LOG.info("No metrics reporter configured, exposing 
metrics via JMX");
                        this.reporter = new JMXReporter();
-                       this.timer = null;
+                       this.executor = null;
                }
                else {
                        MetricReporter reporter;
-                       java.util.Timer timer;
-                       
+                       ScheduledExecutorService executor = null;
                        try {
                                String configuredPeriod = 
config.getString(KEY_METRICS_REPORTER_INTERVAL, null);
                                TimeUnit timeunit = TimeUnit.SECONDS;
@@ -117,24 +118,20 @@ public class MetricRegistry {
                                reporter.open(reporterConfig);
 
                                if (reporter instanceof Scheduled) {
+                                       executor = 
Executors.newSingleThreadScheduledExecutor();
                                        LOG.info("Periodically reporting 
metrics in intervals of {} {}", period, timeunit.name());
-                                       long millis = timeunit.toMillis(period);
                                        
-                                       timer = new java.util.Timer("Periodic 
Metrics Reporter", true);
-                                       timer.schedule(new 
ReporterTask((Scheduled) reporter), millis, millis);
-                               }
-                               else {
-                                       timer = null;
+                                       executor.scheduleWithFixedDelay(new 
ReporterTask((Scheduled) reporter), period, period, timeunit);
                                }
                        }
                        catch (Throwable t) {
                                reporter = new JMXReporter();
-                               timer = null;
+                               shutdownExecutor();
                                LOG.error("Could not instantiate custom metrics 
reporter. Defaulting to JMX metrics export.", t);
                        }
 
                        this.reporter = reporter;
-                       this.timer = timer;
+                       this.executor = executor;
                }
        }
 
@@ -142,9 +139,6 @@ public class MetricRegistry {
         * Shuts down this registry and the associated {@link 
org.apache.flink.metrics.reporter.MetricReporter}.
         */
        public void shutdown() {
-               if (timer != null) {
-                       timer.cancel();
-               }
                if (reporter != null) {
                        try {
                                reporter.close();
@@ -152,6 +146,21 @@ public class MetricRegistry {
                                LOG.warn("Metrics reporter did not shut down 
cleanly", t);
                        }
                }
+               shutdownExecutor();
+       }
+       
+       private void shutdownExecutor() {
+               if (executor != null) {
+                       executor.shutdown();
+
+                       try {
+                               if (!executor.awaitTermination(1, 
TimeUnit.SECONDS)) {
+                                       executor.shutdownNow();
+                               }
+                       } catch (InterruptedException e) {
+                               executor.shutdownNow();
+                       }
+               }
        }
 
        public ScopeFormats getScopeFormats() {

http://git-wip-us.apache.org/repos/asf/flink/blob/56cdec7d/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 
b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
index ae57f55..087a265 100644
--- 
a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ 
b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -53,6 +53,8 @@ public class StatsDReporter extends AbstractReporter 
implements Scheduled {
 //     public static final String ARG_CONVERSION_RATE = "rateConversion";
 //     public static final String ARG_CONVERSION_DURATION = 
"durationConversion";
 
+       private boolean closed = false;
+
        private DatagramSocket socket;
        private InetSocketAddress address;
 
@@ -81,6 +83,7 @@ public class StatsDReporter extends AbstractReporter 
implements Scheduled {
 
        @Override
        public void close() {
+               closed = true;
                if (socket != null && !socket.isClosed()) {
                        socket.close();
                }
@@ -95,10 +98,16 @@ public class StatsDReporter extends AbstractReporter 
implements Scheduled {
                // operator creation and shutdown
                try {
                        for (Map.Entry<Gauge<?>, String> entry : 
gauges.entrySet()) {
+                               if (closed) {
+                                       return;
+                               }
                                reportGauge(entry.getValue(), entry.getKey());
                        }
 
                        for (Map.Entry<Counter, String> entry : 
counters.entrySet()) {
+                               if (closed) {
+                                       return;
+                               }
                                reportCounter(entry.getValue(), entry.getKey());
                        }
                }

Reply via email to