Repository: flink Updated Branches: refs/heads/master 9487fcbfb -> 19ff8db68
[FLINK-4074] Make metric reporters less blocking This closes #2105 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56cdec7d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56cdec7d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56cdec7d Branch: refs/heads/master Commit: 56cdec7d81a53f9ea9578ca274c149f9758b5ffd Parents: 9487fcb Author: zentol <ches...@apache.org> Authored: Wed Jun 15 12:23:41 2016 +0200 Committer: zentol <ches...@apache.org> Committed: Mon Jun 27 13:08:16 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/metrics/MetricRegistry.java | 39 ++++++++++++-------- .../flink/metrics/statsd/StatsDReporter.java | 9 +++++ 2 files changed, 33 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/56cdec7d/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java index b3422e1..0abcdec 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java @@ -31,6 +31,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** @@ -60,7 +62,7 @@ public class MetricRegistry { static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class); private final MetricReporter reporter; - private final java.util.Timer timer; + private final ScheduledExecutorService executor; private final ScopeFormats scopeFormats; @@ -86,12 +88,11 @@ public class MetricRegistry { // by default, create JMX metrics LOG.info("No metrics reporter configured, exposing metrics via JMX"); this.reporter = new JMXReporter(); - this.timer = null; + this.executor = null; } else { MetricReporter reporter; - java.util.Timer timer; - + ScheduledExecutorService executor = null; try { String configuredPeriod = config.getString(KEY_METRICS_REPORTER_INTERVAL, null); TimeUnit timeunit = TimeUnit.SECONDS; @@ -117,24 +118,20 @@ public class MetricRegistry { reporter.open(reporterConfig); if (reporter instanceof Scheduled) { + executor = Executors.newSingleThreadScheduledExecutor(); LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name()); - long millis = timeunit.toMillis(period); - timer = new java.util.Timer("Periodic Metrics Reporter", true); - timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis); - } - else { - timer = null; + executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit); } } catch (Throwable t) { reporter = new JMXReporter(); - timer = null; + shutdownExecutor(); LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t); } this.reporter = reporter; - this.timer = timer; + this.executor = executor; } } @@ -142,9 +139,6 @@ public class MetricRegistry { * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}. */ public void shutdown() { - if (timer != null) { - timer.cancel(); - } if (reporter != null) { try { reporter.close(); @@ -152,6 +146,21 @@ public class MetricRegistry { LOG.warn("Metrics reporter did not shut down cleanly", t); } } + shutdownExecutor(); + } + + private void shutdownExecutor() { + if (executor != null) { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + } + } } public ScopeFormats getScopeFormats() { http://git-wip-us.apache.org/repos/asf/flink/blob/56cdec7d/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java index ae57f55..087a265 100644 --- a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java +++ b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java @@ -53,6 +53,8 @@ public class StatsDReporter extends AbstractReporter implements Scheduled { // public static final String ARG_CONVERSION_RATE = "rateConversion"; // public static final String ARG_CONVERSION_DURATION = "durationConversion"; + private boolean closed = false; + private DatagramSocket socket; private InetSocketAddress address; @@ -81,6 +83,7 @@ public class StatsDReporter extends AbstractReporter implements Scheduled { @Override public void close() { + closed = true; if (socket != null && !socket.isClosed()) { socket.close(); } @@ -95,10 +98,16 @@ public class StatsDReporter extends AbstractReporter implements Scheduled { // operator creation and shutdown try { for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) { + if (closed) { + return; + } reportGauge(entry.getValue(), entry.getKey()); } for (Map.Entry<Counter, String> entry : counters.entrySet()) { + if (closed) { + return; + } reportCounter(entry.getValue(), entry.getKey()); } }