STORM-2909 port new metrics to 2.x branch

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/21da5dff
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/21da5dff
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/21da5dff

Branch: refs/heads/master
Commit: 21da5dff6c6b12f8350096969a7082f68e2f802c
Parents: a8d84d6
Author: Aaron Gresch <agre...@yahoo-inc.com>
Authored: Thu Apr 5 14:42:03 2018 -0500
Committer: Aaron Gresch <agre...@yahoo-inc.com>
Committed: Mon Apr 9 10:24:26 2018 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |  23 +++
 .../org/apache/storm/perf/JCQueuePerfTest.java  |  18 +-
 .../apache/storm/starter/AnchoredWordCount.java | 139 ++++++++++++++
 pom.xml                                         |  10 ++
 storm-client/pom.xml                            |  13 ++
 .../src/jvm/org/apache/storm/Config.java        | 102 +++++++----
 .../src/jvm/org/apache/storm/daemon/Task.java   |  20 ++-
 .../daemon/metrics/ClientMetricsUtils.java      |  52 ++++++
 .../daemon/worker/BackPressureTracker.java      |  12 +-
 .../org/apache/storm/daemon/worker/Worker.java  |  18 +-
 .../apache/storm/daemon/worker/WorkerState.java |  29 +--
 .../storm/daemon/worker/WorkerTransfer.java     |  15 +-
 .../jvm/org/apache/storm/executor/Executor.java |  19 +-
 .../executor/bolt/BoltOutputCollectorImpl.java  |   7 +-
 .../storm/executor/spout/SpoutExecutor.java     |  43 +++--
 .../org/apache/storm/metrics2/JcMetrics.java    |  45 +++++
 .../org/apache/storm/metrics2/SimpleGauge.java  |  38 ++++
 .../storm/metrics2/StormMetricRegistry.java     | 180 +++++++++++++++++++
 .../org/apache/storm/metrics2/TaskMetrics.java  |  85 +++++++++
 .../storm/metrics2/filters/RegexFilter.java     |  48 +++++
 .../metrics2/filters/StormMetricsFilter.java    |  33 ++++
 .../reporters/ConsoleStormReporter.java         |  69 +++++++
 .../metrics2/reporters/CsvStormReporter.java    |  97 ++++++++++
 .../reporters/GangliaStormReporter.java         | 132 ++++++++++++++
 .../reporters/GraphiteStormReporter.java        | 102 +++++++++++
 .../metrics2/reporters/JmxStormReporter.java    |  92 ++++++++++
 .../reporters/ScheduledStormReporter.java       |  86 +++++++++
 .../storm/metrics2/reporters/StormReporter.java |  35 ++++
 .../apache/storm/stats/BoltExecutorStats.java   |  32 +---
 .../jvm/org/apache/storm/stats/CommonStats.java |  23 ++-
 .../apache/storm/stats/SpoutExecutorStats.java  |  26 +--
 .../org/apache/storm/task/TopologyContext.java  |  74 +++++---
 .../src/jvm/org/apache/storm/utils/JCQueue.java |  58 ++++--
 .../storm/validation/ConfigValidation.java      | 167 +++++++++++------
 .../storm/utils/JCQueueBackpressureTest.java    |   2 +-
 .../jvm/org/apache/storm/utils/JCQueueTest.java |   4 +-
 .../java/org/apache/storm/DaemonConfig.java     |  41 ++---
 .../storm/daemon/metrics/MetricsUtils.java      |  39 +---
 .../reporters/ConsolePreparableReporter.java    |  16 +-
 .../reporters/CsvPreparableReporter.java        |  17 +-
 .../reporters/JmxPreparableReporter.java        |  12 +-
 .../storm/nimbus/DefaultTopologyValidator.java  |  38 +++-
 .../storm/nimbus/StrictTopologyValidator.java   |  67 +++++++
 43 files changed, 1830 insertions(+), 348 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 8dcb2d9..c985c12 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -388,3 +388,26 @@ worker.metrics:
 
 # The number of buckets for running statistics
 num.stat.buckets: 20
+
+# Metrics v2 configuration (optional)
+#storm.metrics.reporters:
+#  # Graphite Reporter
+#  - class: "org.apache.storm.metrics2.reporters.GraphiteStormReporter"
+#    daemons:
+#        - "supervisor"
+#        - "nimbus"
+#        - "worker"
+#    report.period: 60
+#    report.period.units: "SECONDS"
+#    graphite.host: "localhost"
+#    graphite.port: 2003
+#
+#  # Console Reporter
+#  - class: "org.apache.storm.metrics2.reporters.ConsoleStormReporter"
+#    daemons:
+#        - "worker"
+#    report.period: 10
+#    report.period.units: "SECONDS"
+#    filter:
+#        class: "org.apache.storm.metrics2.filters.RegexFilter"
+#        expression: ".*my_component.*emitted.*"

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java
index 17c676f..700ce4e 100644
--- 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java
+++ 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java
@@ -46,8 +46,8 @@ public class JCQueuePerfTest {
 
     private static void ackingProducerSimulation() {
         WaitStrategyPark ws = new WaitStrategyPark(100);
-        JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws);
-        JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws);
+        JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws, "test", 
"test",1000, 1000);
+        JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws, "test", 
"test",1000, 1000);
 
         final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ);
         final Acker acker = new Acker(ackQ, spoutQ);
@@ -57,8 +57,8 @@ public class JCQueuePerfTest {
 
     private static void producerFwdConsumer(int prodBatchSz) {
         WaitStrategyPark ws = new WaitStrategyPark(100);
-        JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws);
-        JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws);
+        JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", 
"test",1000, 1000);
+        JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", 
"test",1000, 1000);
 
         final Producer prod = new Producer(q1);
         final Forwarder fwd = new Forwarder(q1, q2);
@@ -69,7 +69,7 @@ public class JCQueuePerfTest {
 
 
     private static void oneProducer1Consumer(int prodBatchSz) {
-        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new 
WaitStrategyPark(100));
+        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new 
WaitStrategyPark(100), "test", "test",1000, 1000);
 
         final Producer prod1 = new Producer(q1);
         final Consumer cons1 = new Consumer(q1);
@@ -78,7 +78,7 @@ public class JCQueuePerfTest {
     }
 
     private static void twoProducer1Consumer(int prodBatchSz) {
-        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new 
WaitStrategyPark(100));
+        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new 
WaitStrategyPark(100), "test", "test",1000, 1000);
 
         final Producer prod1 = new Producer(q1);
         final Producer prod2 = new Producer(q1);
@@ -88,7 +88,7 @@ public class JCQueuePerfTest {
     }
 
     private static void threeProducer1Consumer(int prodBatchSz) {
-        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new 
WaitStrategyPark(100));
+        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new 
WaitStrategyPark(100), "test", "test",1000, 1000);
 
         final Producer prod1 = new Producer(q1);
         final Producer prod2 = new Producer(q1);
@@ -101,8 +101,8 @@ public class JCQueuePerfTest {
 
     private static void oneProducer2Consumers(int prodBatchSz) {
         WaitStrategyPark ws = new WaitStrategyPark(100);
-        JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws);
-        JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws);
+        JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", 
"test",1000, 1000);
+        JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", 
"test",1000, 1000);
 
         final Producer2 prod1 = new Producer2(q1, q2);
         final Consumer cons1 = new Consumer(q1);

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
new file mode 100644
index 0000000..cb45024
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.starter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.storm.Config;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.ConfigurableTopology;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+public class AnchoredWordCount extends ConfigurableTopology {
+
+    public static class RandomSentenceSpout extends BaseRichSpout {
+        SpoutOutputCollector collector;
+        Random random;
+
+
+        @Override
+        public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+            this.collector = collector;
+            this.random = new Random();
+        }
+
+        @Override
+        public void nextTuple() {
+            Utils.sleep(10);
+            String[] sentences = new String[]{sentence("the cow jumped over 
the moon"), sentence("an apple a day keeps the doctor away"),
+                    sentence("four score and seven years ago"),
+                    sentence("snow white and the seven dwarfs"), sentence("i 
am at two with nature")};
+            final String sentence = 
sentences[random.nextInt(sentences.length)];
+
+            this.collector.emit(new Values(sentence), UUID.randomUUID());
+        }
+
+        protected String sentence(String input) {
+            return input;
+        }
+
+        @Override
+        public void ack(Object id) {
+        }
+
+        @Override
+        public void fail(Object id) {
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+    }
+
+
+    public static class SplitSentence extends BaseBasicBolt {
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word: sentence.split("\\s+")) {
+                collector.emit(new Values(word, 1));
+            }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
+    }
+
+    public static class WordCount extends BaseBasicBolt {
+        Map<String, Integer> counts = new HashMap<>();
+
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String word = tuple.getString(0);
+            Integer count = counts.get(word);
+            if (count == null) {
+                count = 0;
+            }
+            count++;
+            counts.put(word, count);
+            collector.emit(new Values(word, count));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
+    }
+
+    protected int run(String[] args) throws Exception {
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout("spout", new RandomSentenceSpout(), 4);
+
+        builder.setBolt("split", new SplitSentence(), 
4).shuffleGrouping("spout");
+        builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", 
new Fields("word"));
+
+        Config conf = new Config();
+        conf.setMaxTaskParallelism(3);
+
+        String topologyName = "word-count";
+
+        conf.setNumWorkers(3);
+
+        if (args != null && args.length > 0) {
+            topologyName = args[0];
+        }
+        return submit(topologyName, conf, builder);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d3204b2..77c7738 100644
--- a/pom.xml
+++ b/pom.xml
@@ -956,6 +956,16 @@
                 <version>${metrics.version}</version>
             </dependency>
             <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-ganglia</artifactId>
+                <version>${metrics.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-graphite</artifactId>
+                <version>${metrics.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>metrics-clojure</groupId>
                 <artifactId>metrics-clojure</artifactId>
                 <version>${metrics-clojure.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/pom.xml
----------------------------------------------------------------------
diff --git a/storm-client/pom.xml b/storm-client/pom.xml
index 2e073a8..1311d2a 100644
--- a/storm-client/pom.xml
+++ b/storm-client/pom.xml
@@ -184,6 +184,19 @@
             <artifactId>curator-client</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-ganglia</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-graphite</artifactId>
+        </dependency>
+
         <!-- end of transitive dependency management -->
 
         <!-- test -->

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java 
b/storm-client/src/jvm/org/apache/storm/Config.java
index b275985..3ebc493 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -18,20 +18,19 @@
 
 package org.apache.storm;
 
+import com.esotericsoftware.kryo.Serializer;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.storm.metric.IEventLogger;
 import org.apache.storm.policy.IWaitStrategy;
 import org.apache.storm.serialization.IKryoDecorator;
 import org.apache.storm.serialization.IKryoFactory;
 import org.apache.storm.validation.ConfigValidation;
-import org.apache.storm.validation.ConfigValidationAnnotations.*;
 import org.apache.storm.validation.ConfigValidation.*;
-import com.esotericsoftware.kryo.Serializer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.storm.validation.ConfigValidationAnnotations.*;
 
 /**
  * Topology configs are specified as a plain old map. This class provides a
@@ -148,7 +147,7 @@ public class Config extends HashMap<String, Object> {
      * nimbus.authorizer to 
org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
     @isStringList
-    public static final String 
TOPOLOGY_READONLY_USERS="topology.readonly.users";
+    public static final String TOPOLOGY_READONLY_USERS = 
"topology.readonly.users";
 
     /**
      * A list of readonly groups that are allowed to interact with the 
topology.  To use this set
@@ -171,7 +170,7 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_DEBUG = "topology.debug";
 
     /**
-     * User defined version of this topology
+     * User defined version of this topology.
      */
     @isString
     public static final String TOPOLOGY_VERSION = "topology.version";
@@ -186,7 +185,7 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * The serializer for communication between shell components and non-JVM
-     * processes
+     * processes.
      */
     @isString
     public static final String TOPOLOGY_MULTILANG_SERIALIZER = 
"topology.multilang.serializer";
@@ -268,14 +267,16 @@ public class Config extends HashMap<String, Object> {
      * to allocate slots on machines with enough available memory.  A default 
value will be set for this config if user does not override
      */
     @isPositiveNumber(includeZero = true)
-    public static final String 
TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB = 
"topology.metrics.consumer.resources.onheap.memory.mb";
+    public static final String 
TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB =
+            "topology.metrics.consumer.resources.onheap.memory.mb";
 
     /**
      * The maximum amount of memory an instance of a metrics consumer will 
take off heap. This enables the scheduler
      * to allocate slots on machines with enough available memory.  A default 
value will be set for this config if user does not override
      */
     @isPositiveNumber(includeZero = true)
-    public static final String 
TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB = 
"topology.metrics.consumer.resources.offheap.memory.mb";
+    public static final String 
TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB =
+            "topology.metrics.consumer.resources.offheap.memory.mb";
 
     /**
      * The config indicates the percentage of cpu for a core an 
instance(executor) of a metrics consumer will use.
@@ -311,13 +312,13 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_STATE_CHECKPOINT_INTERVAL = 
"topology.state.checkpoint.interval.ms";
 
     /**
-     * A per topology config that specifies the maximum amount of memory a 
worker can use for that specific topology
+     * A per topology config that specifies the maximum amount of memory a 
worker can use for that specific topology.
      */
     @isPositiveNumber
     public static final String TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB = 
"topology.worker.max.heap.size.mb";
 
     /**
-     * The strategy to use when scheduling a topology with Resource Aware 
Scheduler
+     * The strategy to use when scheduling a topology with Resource Aware 
Scheduler.
      */
     @NotNull
     @isString
@@ -388,7 +389,7 @@ public class Config extends HashMap<String, Object> {
      * Note that EventLoggerBolt takes care of all the implementations of 
IEventLogger, hence registering
      * many implementations (especially they're implemented as 'blocking' 
manner) would slow down overall topology.
      */
-    
@isListEntryCustom(entryValidatorClasses={EventLoggerRegistryValidator.class})
+    @isListEntryCustom(entryValidatorClasses = 
{EventLoggerRegistryValidator.class})
     public static final String TOPOLOGY_EVENT_LOGGER_REGISTER = 
"topology.event.logger.register";
 
     /**
@@ -452,10 +453,10 @@ public class Config extends HashMap<String, Object> {
      * rather than throw an error.
      */
     @isBoolean
-    public static final String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS= 
"topology.skip.missing.kryo.registrations";
+    public static final String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS = 
"topology.skip.missing.kryo.registrations";
 
     /**
-     * List of classes to register during state serialization
+     * List of classes to register during state serialization.
      */
     @isStringList
     public static final String TOPOLOGY_STATE_KRYO_REGISTER = 
"topology.state.kryo.register";
@@ -466,7 +467,7 @@ public class Config extends HashMap<String, Object> {
      * Each listed class maps 1:1 to a system bolt named 
__metrics_ClassName#N, and it's parallelism is configurable.
      */
 
-    @isListEntryCustom(entryValidatorClasses={MetricRegistryValidator.class})
+    @isListEntryCustom(entryValidatorClasses = {MetricRegistryValidator.class})
     public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = 
"topology.metrics.consumer.register";
 
     /**
@@ -477,13 +478,13 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS = 
"topology.serialized.message.size.metrics";
 
     /**
-     * A map of metric name to class name implementing IMetric that will be 
created once per worker JVM
+     * A map of metric name to class name implementing IMetric that will be 
created once per worker JVM.
      */
     @isMapEntryType(keyType = String.class, valueType = String.class)
     public static final String TOPOLOGY_WORKER_METRICS = 
"topology.worker.metrics";
 
     /**
-     * A map of metric name to class name implementing IMetric that will be 
created once per worker JVM
+     * A map of metric name to class name implementing IMetric that will be 
created once per worker JVM.
      */
     @isMapEntryType(keyType = String.class, valueType = String.class)
     public static final String WORKER_METRICS = "worker.metrics";
@@ -494,7 +495,7 @@ public class Config extends HashMap<String, Object> {
      */
     @isInteger
     @isPositiveNumber
-    public static final String 
TOPOLOGY_MAX_TASK_PARALLELISM="topology.max.task.parallelism";
+    public static final String TOPOLOGY_MAX_TASK_PARALLELISM = 
"topology.max.task.parallelism";
 
     /**
      * The maximum number of tuples that can be pending on a spout task at any 
given time.
@@ -506,14 +507,14 @@ public class Config extends HashMap<String, Object> {
      */
     @isInteger
     @isPositiveNumber
-    public static final String 
TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
+    public static final String TOPOLOGY_MAX_SPOUT_PENDING = 
"topology.max.spout.pending";
 
     /**
      * The amount of milliseconds the SleepEmptyEmitStrategy should sleep for.
      */
     @isInteger
     @isPositiveNumber(includeZero = true)
-    public static final String 
TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS="topology.sleep.spout.wait.strategy.time.ms";
+    public static final String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS = 
"topology.sleep.spout.wait.strategy.time.ms";
 
     /**
      * The maximum amount of time a component gives a source of state to 
synchronize before it requests
@@ -522,49 +523,49 @@ public class Config extends HashMap<String, Object> {
     @isInteger
     @isPositiveNumber
     @NotNull
-    public static final String 
TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS="topology.state.synchronization.timeout.secs";
+    public static final String TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS = 
"topology.state.synchronization.timeout.secs";
 
     /**
      * The percentage of tuples to sample to produce stats for a task.
      */
     @isPositiveNumber
-    public static final String 
TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate";
+    public static final String TOPOLOGY_STATS_SAMPLE_RATE = 
"topology.stats.sample.rate";
 
     /**
      * The time period that builtin metrics data in bucketed into.
      */
     @isInteger
-    public static final String 
TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs";
+    public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS = 
"topology.builtin.metrics.bucket.size.secs";
 
     /**
      * Whether or not to use Java serialization in a topology.
      */
     @isBoolean
-    public static final String 
TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION="topology.fall.back.on.java.serialization";
+    public static final String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION = 
"topology.fall.back.on.java.serialization";
 
     /**
      * Topology-specific options for the worker child process. This is used in 
addition to WORKER_CHILDOPTS.
      */
     @isStringOrStringList
-    public static final String 
TOPOLOGY_WORKER_CHILDOPTS="topology.worker.childopts";
+    public static final String TOPOLOGY_WORKER_CHILDOPTS = 
"topology.worker.childopts";
 
     /**
      * Topology-specific options GC for the worker child process. This 
overrides WORKER_GC_CHILDOPTS.
      */
     @isStringOrStringList
-    public static final String 
TOPOLOGY_WORKER_GC_CHILDOPTS="topology.worker.gc.childopts";
+    public static final String TOPOLOGY_WORKER_GC_CHILDOPTS = 
"topology.worker.gc.childopts";
 
     /**
      * Topology-specific options for the logwriter process of a worker.
      */
     @isStringOrStringList
-    public static final String 
TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS="topology.worker.logwriter.childopts";
+    public static final String TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS = 
"topology.worker.logwriter.childopts";
 
     /**
      * Topology-specific classpath for the worker child process. This is 
combined to the usual classpath.
      */
     @isStringOrStringList
-    public static final String TOPOLOGY_CLASSPATH="topology.classpath";
+    public static final String TOPOLOGY_CLASSPATH = "topology.classpath";
 
     /**
      * Topology-specific classpath for the worker child process. This will be 
*prepended* to
@@ -573,14 +574,14 @@ public class Config extends HashMap<String, Object> {
      * classpaths, set the storm.topology.classpath.beginning.enabled config 
to true.
      */
     @isStringOrStringList
-    public static final String 
TOPOLOGY_CLASSPATH_BEGINNING="topology.classpath.beginning";
+    public static final String TOPOLOGY_CLASSPATH_BEGINNING = 
"topology.classpath.beginning";
 
     /**
      * Topology-specific environment variables for the worker child process.
      * This is added to the existing environment (that of the supervisor)
      */
     @isMapEntryType(keyType = String.class, valueType = String.class)
-    public static final String TOPOLOGY_ENVIRONMENT="topology.environment";
+    public static final String TOPOLOGY_ENVIRONMENT = "topology.environment";
 
     /*
      * Bolt-specific configuration for windowed bolts to specify the window 
length as a count of number of tuples
@@ -651,7 +652,7 @@ public class Config extends HashMap<String, Object> {
      * topology in Zookeeper.
      */
     @isString
-    public static final String 
TOPOLOGY_TRANSACTIONAL_ID="topology.transactional.id";
+    public static final String TOPOLOGY_TRANSACTIONAL_ID = 
"topology.transactional.id";
 
     /**
      * A list of task hooks that are automatically added to every spout and 
bolt in the topology. An example
@@ -659,21 +660,21 @@ public class Config extends HashMap<String, Object> {
      * monitoring system. These hooks are instantiated using the zero-arg 
constructor.
      */
     @isStringList
-    public static final String 
TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks";
+    public static final String TOPOLOGY_AUTO_TASK_HOOKS = 
"topology.auto.task.hooks";
 
     /**
      * The size of the receive queue for each executor.
      */
     @isPositiveNumber
     @isInteger
-    public static final String 
TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE="topology.executor.receive.buffer.size";
+    public static final String TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE = 
"topology.executor.receive.buffer.size";
 
     /**
      * The size of the transfer queue for each worker.
      */
     @isPositiveNumber
     @isInteger
-    public static final String 
TOPOLOGY_TRANSFER_BUFFER_SIZE="topology.transfer.buffer.size";
+    public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE = 
"topology.transfer.buffer.size";
 
     /**
      * The size of the transfer queue for each worker.
@@ -1240,6 +1241,9 @@ public class Config extends HashMap<String, Object> {
     @isString
     public static final String STORM_META_SERIALIZATION_DELEGATE = 
"storm.meta.serialization.delegate";
 
+    @isListEntryCustom(entryValidatorClasses={MetricReportersValidator.class})
+    public static final String STORM_METRICS_REPORTERS = 
"storm.metrics.reporters";
+
     /**
      * What blobstore implementation the storm client should use.
      */
@@ -1672,7 +1676,8 @@ public class Config extends HashMap<String, Object> {
     /**
      * Impersonation user ACL config entries.
      */
-    @isMapEntryCustom(keyValidatorClasses = 
{ConfigValidation.StringValidator.class}, valueValidatorClasses = 
{ConfigValidation.ImpersonationAclUserEntryValidator.class})
+    @isMapEntryCustom(keyValidatorClasses = 
{ConfigValidation.StringValidator.class},
+            valueValidatorClasses = 
{ConfigValidation.ImpersonationAclUserEntryValidator.class})
     public static final String NIMBUS_IMPERSONATION_ACL = 
"nimbus.impersonation.acl";
 
     /**
@@ -1913,6 +1918,27 @@ public class Config extends HashMap<String, Object> {
     @isPositiveNumber
     public static final String WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS = 
"worker.blob.update.poll.interval.secs";
 
+    /**
+     * A specify Locale for daemon metrics reporter plugin.
+     * Use the specified IETF BCP 47 language tag string for a Locale.
+     */
+    @isString
+    public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE = 
"storm.daemon.metrics.reporter.plugin.locale";
+
+    /**
+     * A specify rate-unit in TimeUnit to specify reporting frequency for 
daemon metrics reporter plugin.
+     */
+    @isString
+    public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT 
= "storm.daemon.metrics.reporter.plugin.rate.unit";
+
+    /**
+     * A specify duration-unit in TimeUnit to specify reporting window for 
daemon metrics reporter plugin.
+     */
+    @isString
+    public static final String 
STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT = 
"storm.daemon.metrics.reporter.plugin.duration.unit";
+
+
+
     public static void setClasspath(Map<String, Object> conf, String cp) {
         conf.put(Config.TOPOLOGY_CLASSPATH, cp);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java 
b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
index ce9d0e4..4bc0e25 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -29,7 +29,6 @@ import java.util.Queue;
 import java.util.Random;
 import java.util.function.BooleanSupplier;
 
-
 import org.apache.storm.Config;
 import org.apache.storm.Thrift;
 import org.apache.storm.daemon.worker.WorkerState;
@@ -46,6 +45,7 @@ import org.apache.storm.generated.StormTopology;
 import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
 import org.apache.storm.hooks.ITaskHook;
 import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.metrics2.TaskMetrics;
 import org.apache.storm.spout.ShellSpout;
 import org.apache.storm.stats.CommonStats;
 import org.apache.storm.task.ShellBolt;
@@ -78,6 +78,7 @@ public class Task {
     private Map<String, Map<String, LoadAwareCustomStreamGrouping>> 
streamComponentToGrouper;
     private HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> 
streamToGroupers;
     private boolean debug;
+    private final TaskMetrics taskMetrics;
 
     public Task(Executor executor, Integer taskId) throws IOException {
         this.taskId = taskId;
@@ -95,6 +96,7 @@ public class Task {
         this.taskObject = mkTaskObject();
         this.debug = topoConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) 
topoConf.get(Config.TOPOLOGY_DEBUG);
         this.addTaskHooks();
+        this.taskMetrics = new TaskMetrics(this.workerTopologyContext, 
this.componentId, this.taskId);
     }
 
     public List<Integer> getOutgoingTasks(Integer outTaskId, String stream, 
List<Object> values) {
@@ -116,9 +118,9 @@ public class Task {
 
         try {
             if (emitSampler.getAsBoolean()) {
-                executorStats.emittedTuple(stream);
+                executorStats.emittedTuple(stream, 
this.taskMetrics.getEmitted(stream));
                 if (null != outTaskId) {
-                    executorStats.transferredTuples(stream, 1);
+                    executorStats.transferredTuples(stream, 1, 
this.taskMetrics.getTransferred(stream));
                 }
             }
         } catch (Exception e) {
@@ -140,7 +142,7 @@ public class Task {
 
         ArrayList<LoadAwareCustomStreamGrouping> groupers = 
streamToGroupers.get(stream);
         if (null != groupers)  {
-            for (int i=0; i<groupers.size(); ++i) {
+            for (int i = 0; i < groupers.size(); ++i) {
                 LoadAwareCustomStreamGrouping grouper = groupers.get(i);
                 if (grouper == GrouperFactory.DIRECT) {
                     throw new IllegalArgumentException("Cannot do regular emit 
to direct stream");
@@ -157,8 +159,8 @@ public class Task {
         }
         try {
             if (emitSampler.getAsBoolean()) {
-                executorStats.emittedTuple(stream);
-                executorStats.transferredTuples(stream, outTasks.size());
+                executorStats.emittedTuple(stream, 
this.taskMetrics.getEmitted(stream));
+                executorStats.transferredTuples(stream, outTasks.size(), 
this.taskMetrics.getTransferred(stream));
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -186,6 +188,9 @@ public class Task {
         return taskObject;
     }
 
+    public TaskMetrics getTaskMetrics() {
+        return taskMetrics;
+    }
 
     // Non Blocking call. If cannot emit to destination immediately, such 
tuples will be added to `pendingEmits` argument
     public void sendUnanchored(String stream, List<Object> values, 
ExecutorTransfer transfer, Queue<AddressedTuple> pendingEmits) {
@@ -284,7 +289,8 @@ public class Task {
         }
     }
 
-    private static HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> 
getGroupersPerStream(Map<String, Map<String, LoadAwareCustomStreamGrouping>> 
streamComponentToGrouper) {
+    private static HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> 
getGroupersPerStream(
+            Map<String, Map<String, LoadAwareCustomStreamGrouping>> 
streamComponentToGrouper) {
         HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> result = new 
HashMap<>(streamComponentToGrouper.size());
 
         for (Entry<String, Map<String, LoadAwareCustomStreamGrouping>> entry : 
streamComponentToGrouper.entrySet()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/daemon/metrics/ClientMetricsUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/daemon/metrics/ClientMetricsUtils.java 
b/storm-client/src/jvm/org/apache/storm/daemon/metrics/ClientMetricsUtils.java
new file mode 100644
index 0000000..3fbdb7e
--- /dev/null
+++ 
b/storm-client/src/jvm/org/apache/storm/daemon/metrics/ClientMetricsUtils.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.metrics;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.storm.Config;
+import org.apache.storm.utils.ObjectReader;
+
+public class ClientMetricsUtils {
+
+    public static TimeUnit getMetricsRateUnit(Map<String, Object> topoConf) {
+        return getTimeUnitForCofig(topoConf, 
Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT);
+    }
+
+    public static TimeUnit getMetricsDurationUnit(Map<String, Object> 
topoConf) {
+        return getTimeUnitForCofig(topoConf, 
Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT);
+    }
+
+    public static Locale getMetricsReporterLocale(Map<String, Object> 
topoConf) {
+        String languageTag = 
ObjectReader.getString(topoConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE),
 null);
+        if (languageTag != null) {
+            return Locale.forLanguageTag(languageTag);
+        }
+        return null;
+    }
+
+    private static TimeUnit getTimeUnitForCofig(Map<String, Object> topoConf, 
String configName) {
+        String rateUnitString = 
ObjectReader.getString(topoConf.get(configName), null);
+        if (rateUnitString != null) {
+            return TimeUnit.valueOf(rateUnitString);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
index 9b31c1a..6f4be8e 100644
--- 
a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
+++ 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
@@ -19,11 +19,12 @@
 package org.apache.storm.daemon.worker;
 
 import java.util.ArrayList;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.storm.Constants;
 import org.apache.storm.messaging.netty.BackPressureStatus;
 import org.apache.storm.utils.JCQueue;
 import org.slf4j.Logger;
@@ -37,7 +38,8 @@ import static org.apache.storm.Constants.SYSTEM_TASK_ID;
  *   ConcurrentHashMap does not allow storing null values, so we use the 
special value NONE instead.
  */
 public class BackPressureTracker {
-    private static final JCQueue NONE =  new JCQueue ("NoneQ", 2, 0, 1, null) 
{ };
+    private static final JCQueue NONE =  new JCQueue("NoneQ", 2, 0, 1, null,
+            "none", Constants.SYSTEM_COMPONENT_ID, -1, 0) { };
 
     static final Logger LOG = 
LoggerFactory.getLogger(BackPressureTracker.class);
 
@@ -47,7 +49,7 @@ public class BackPressureTracker {
     public BackPressureTracker(String workerId, List<Integer> allLocalTasks) {
         this.workerId = workerId;
         for (Integer taskId : allLocalTasks) {
-            if(taskId != SYSTEM_TASK_ID) {
+            if (taskId != SYSTEM_TASK_ID) {
                 tasks.put(taskId, NONE);  // all tasks are considered to be 
not under BP initially
             }
         }
@@ -58,7 +60,7 @@ public class BackPressureTracker {
     }
 
     /***
-     * Record BP for a task
+     * Record BP for a task.
      * This is called by transferLocalBatch() on NettyWorker thread
      * @return true if an update was recorded, false if taskId is already 
under BP
      */
@@ -70,7 +72,7 @@ public class BackPressureTracker {
     public boolean refreshBpTaskList() {
         boolean changed = false;
         LOG.debug("Running Back Pressure status change check");
-        for ( Entry<Integer, JCQueue> entry : tasks.entrySet()) {
+        for (Entry<Integer, JCQueue> entry : tasks.entrySet()) {
             if (entry.getValue() != NONE  &&  
entry.getValue().isEmptyOverflow()) {
                 recordNoBackPressure(entry.getKey());
                 changed = true;

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 65915d0..9867cb4 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -63,6 +63,7 @@ import org.apache.storm.generated.LogConfig;
 import org.apache.storm.generated.SupervisorWorkerHeartbeat;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.IContext;
+import org.apache.storm.metrics2.StormMetricRegistry;
 import org.apache.storm.security.auth.AuthUtils;
 import org.apache.storm.security.auth.IAutoCredentials;
 import org.apache.storm.stats.StatsUtil;
@@ -73,11 +74,9 @@ import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.SupervisorClient;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
-import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
 
 public class Worker implements Shutdownable, DaemonCommon {
@@ -149,6 +148,8 @@ public class Worker implements Shutdownable, DaemonCommon {
         IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, 
topologyConf, csContext);
         IStormClusterState stormClusterState = 
ClusterUtils.mkStormClusterState(stateStorage, null, csContext);
 
+        StormMetricRegistry.start(conf, DaemonType.WORKER);
+
         Credentials initialCredentials = 
stormClusterState.credentials(topologyId, null);
         Map<String, String> initCreds = new HashMap<>();
         if (initialCredentials != null) {
@@ -284,7 +285,8 @@ public class Worker implements Shutdownable, DaemonCommon {
         final Integer xferBatchSize = 
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE));
         final Long flushIntervalMillis = 
ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS));
         if ((producerBatchSize == 1 && xferBatchSize == 1) || 
flushIntervalMillis == 0) {
-            LOG.info("Flush Tuple generation disabled. producerBatchSize={}, 
xferBatchSize={}, flushIntervalMillis={}", producerBatchSize, xferBatchSize, 
flushIntervalMillis);
+            LOG.info("Flush Tuple generation disabled. producerBatchSize={}, 
xferBatchSize={}, flushIntervalMillis={}",
+                    producerBatchSize, xferBatchSize, flushIntervalMillis);
             return;
         }
 
@@ -309,8 +311,8 @@ public class Worker implements Shutdownable, DaemonCommon {
             return;
         }
         final Long bpCheckIntervalMs = 
ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_CHECK_MILLIS));
-        
workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs
-            , bpCheckIntervalMs, () -> 
workerState.refreshBackPressureStatus());
+        
workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs,
+                bpCheckIntervalMs, () -> 
workerState.refreshBackPressureStatus());
         LOG.info("BackPressure status change checking will be performed every 
{} millis", bpCheckIntervalMs);
     }
 
@@ -416,12 +418,12 @@ public class Worker implements Shutdownable, DaemonCommon 
{
         //In distributed mode, send heartbeat directly to master if local 
supervisor goes down.
         SupervisorWorkerHeartbeat workerHeartbeat = new 
SupervisorWorkerHeartbeat(lsWorkerHeartbeat.get_topology_id(),
                 lsWorkerHeartbeat.get_executors(), 
lsWorkerHeartbeat.get_time_secs());
-        try (SupervisorClient client = 
SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPort)){
+        try (SupervisorClient client = 
SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPort)) {
             client.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat);
         } catch (Exception tr1) {
             //If any error/exception thrown, report directly to nimbus.
             LOG.warn("Exception when send heartbeat to local supervisor", 
tr1.getMessage());
-            try (NimbusClient nimbusClient = 
NimbusClient.getConfiguredClient(conf)){
+            try (NimbusClient nimbusClient = 
NimbusClient.getConfiguredClient(conf)) {
                 
nimbusClient.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat);
             } catch (Exception tr2) {
                 //if any error/exception thrown, just ignore.
@@ -473,6 +475,8 @@ public class Worker implements Shutdownable, DaemonCommon {
             workerState.backPressureCheckTimer.close();
             workerState.closeResources();
 
+            StormMetricRegistry.stop();
+
             LOG.info("Trigger any worker shutdown hooks");
             workerState.runWorkerShutdownHooks();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 8d8c62f..fc94086 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -45,7 +45,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.StormTimer;
-import org.apache.storm.messaging.netty.BackPressureStatus;
 import org.apache.storm.cluster.IStateStorage;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.cluster.VersionedData;
@@ -69,9 +68,9 @@ import 
org.apache.storm.messaging.DeserializingConnectionCallback;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.IContext;
 import org.apache.storm.messaging.TransportFactory;
-import org.apache.storm.security.auth.IAutoCredentials;
-
+import org.apache.storm.messaging.netty.BackPressureStatus;
 import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.security.auth.IAutoCredentials;
 import org.apache.storm.serialization.ITupleSerializer;
 import org.apache.storm.serialization.KryoTupleSerializer;
 import org.apache.storm.task.WorkerTopologyContext;
@@ -79,10 +78,10 @@ import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.JCQueue;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.SupervisorClient;
 import org.apache.storm.utils.ThriftTopologyUtils;
+import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.Utils.SmartThread;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -217,7 +216,9 @@ public class WorkerState {
     // local executors and localTaskIds running in this worker
     final Set<List<Long>> localExecutors;
     final ArrayList<Integer> localTaskIds;
-    final Map<Integer, JCQueue> localReceiveQueues = new HashMap<>(); // 
[taskId]-> JCQueue :  initialized after local executors are initialized
+
+    // [taskId]-> JCQueue :  initialized after local executors are initialized
+    final Map<Integer, JCQueue> localReceiveQueues = new HashMap<>();
 
     final Map<String, Object> topologyConf;
     final StormTopology topology;
@@ -418,9 +419,8 @@ public class WorkerState {
     public void refreshStormActive(Runnable callback) {
         StormBase base = stormClusterState.stormBase(topologyId, callback);
         isTopologyActive.set(
-            (null != base) &&
-                (base.get_status() == TopologyStatus.ACTIVE) &&
-                (isWorkerActive.get()));
+            (null != base)
+                    && (base.get_status() == TopologyStatus.ACTIVE) && 
(isWorkerActive.get()));
         if (null != base) {
             Map<String, DebugOptions> debugOptionsMap = new 
HashMap<>(base.get_component_debug());
             for (DebugOptions debugOptions : debugOptionsMap.values()) {
@@ -559,7 +559,8 @@ public class WorkerState {
     private void dropMessage(AddressedTuple tuple, JCQueue queue) {
         ++dropCount;
         queue.recordMsgDrop();
-        LOG.warn("Dropping message as overflow threshold has reached for Q = 
{}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}", 
queue.getName(), queue.getOverflowCount(), dropCount, tuple);
+        LOG.warn("Dropping message as overflow threshold has reached for Q = 
{}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}",
+                queue.getName(), queue.getOverflowCount(), dropCount, tuple);
     }
 
     public void checkSerialize(KryoTupleSerializer serializer, AddressedTuple 
tuple) {
@@ -643,7 +644,7 @@ public class WorkerState {
     private Assignment getLocalAssignment(Map<String, Object> conf, 
IStormClusterState stormClusterState, String topologyId) {
         if (!ConfigUtils.isLocalMode(conf)) {
             try (SupervisorClient supervisorClient = 
SupervisorClient.getConfiguredClient(conf, Utils.hostname(),
-                    supervisorPort)){
+                    supervisorPort)) {
                 Assignment assignment = 
supervisorClient.getClient().getLocalAssignmentForStorm(topologyId);
                 return assignment;
             } catch (Throwable tr1) {
@@ -661,15 +662,17 @@ public class WorkerState {
         Integer overflowLimit = 
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_OVERFLOW_LIMIT));
 
         if (recvBatchSize > recvQueueSize / 2) {
-            throw new 
IllegalArgumentException(Config.TOPOLOGY_PRODUCER_BATCH_SIZE + ":" + 
recvBatchSize +
-                " is greater than half of " + 
Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE + ":" + recvQueueSize);
+            throw new 
IllegalArgumentException(Config.TOPOLOGY_PRODUCER_BATCH_SIZE + ":" + 
recvBatchSize
+                    + " is greater than half of " + 
Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE + ":" + recvQueueSize);
         }
 
         IWaitStrategy backPressureWaitStrategy = 
IWaitStrategy.createBackPressureWaitStrategy(topologyConf);
         Map<List<Long>, JCQueue> receiveQueueMap = new HashMap<>();
         for (List<Long> executor : executors) {
+            int port = this.getPort();
             receiveQueueMap.put(executor, new JCQueue("receive-queue" + 
executor.toString(),
-                recvQueueSize, overflowLimit, recvBatchSize, 
backPressureWaitStrategy));
+                recvQueueSize, overflowLimit, recvBatchSize, 
backPressureWaitStrategy,
+                    this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, -1, 
this.getPort()));
 
         }
         return receiveQueueMap;

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
index e57123c..8923bf5 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
@@ -18,7 +18,12 @@
 
 package org.apache.storm.daemon.worker;
 
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.storm.Config;
+import org.apache.storm.Constants;
 import org.apache.storm.messaging.TaskMessage;
 import org.apache.storm.policy.IWaitStrategy;
 import org.apache.storm.serialization.ITupleSerializer;
@@ -31,11 +36,6 @@ import org.apache.storm.utils.Utils.SmartThread;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 // Transfers messages destined to other workers
 class WorkerTransfer implements JCQueue.Consumer {
     static final Logger LOG = LoggerFactory.getLogger(WorkerTransfer.class);
@@ -53,7 +53,7 @@ class WorkerTransfer implements JCQueue.Consumer {
         this.workerState = workerState;
         this.backPressureWaitStrategy = 
IWaitStrategy.createBackPressureWaitStrategy(topologyConf);
         this.drainer = new TransferDrainer();
-        this.remoteBackPressureStatus = new AtomicBoolean[maxTaskIdInTopo+1];
+        this.remoteBackPressureStatus = new AtomicBoolean[maxTaskIdInTopo + 1];
         for (int i = 0; i < remoteBackPressureStatus.length; i++) {
             remoteBackPressureStatus[i] = new AtomicBoolean(false);
         }
@@ -65,7 +65,8 @@ class WorkerTransfer implements JCQueue.Consumer {
                 + Config.TOPOLOGY_TRANSFER_BUFFER_SIZE + ":" + xferQueueSz);
         }
 
-        this.transferQueue = new JCQueue("worker-transfer-queue", xferQueueSz, 
0, xferBatchSz, backPressureWaitStrategy);
+        this.transferQueue = new JCQueue("worker-transfer-queue", xferQueueSz, 
0, xferBatchSz, backPressureWaitStrategy,
+                workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, 
-1, workerState.getPort());
     }
 
     public JCQueue getTransferQueue() {

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java 
b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index f811b82..962e019 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.UnknownHostException;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -33,15 +32,15 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.Random;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.Callable;
 import java.util.function.BooleanSupplier;
 import java.util.stream.Collectors;
 
-
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
 import org.apache.storm.cluster.ClusterStateContext;
 import org.apache.storm.cluster.ClusterUtils;
 import org.apache.storm.cluster.DaemonType;
@@ -66,11 +65,8 @@ import 
org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
 import org.apache.storm.grouping.LoadMapping;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IMetricsConsumer;
-import org.apache.storm.stats.BoltExecutorStats;
 import org.apache.storm.stats.CommonStats;
-import org.apache.storm.stats.SpoutExecutorStats;
 import org.apache.storm.stats.StatsUtil;
-import org.apache.storm.StormTimer;
 import org.apache.storm.task.WorkerTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.Fields;
@@ -78,9 +74,9 @@ import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.JCQueue;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
 import org.jctools.queues.MpscChunkedArrayQueue;
 import org.json.simple.JSONValue;
 import org.json.simple.parser.ParseException;
@@ -254,7 +250,7 @@ public abstract class Executor implements Callable, 
JCQueue.Consumer {
     }
 
     /**
-     * separated from mkExecutor in order to replace executor transfer in 
executor data for testing
+     * separated from mkExecutor in order to replace executor transfer in 
executor data for testing.
      */
     public ExecutorShutdown execute() throws Exception {
         LOG.info("Loading executor tasks " + componentId + ":" + executorId);
@@ -400,7 +396,7 @@ public abstract class Executor implements Callable, 
JCQueue.Consumer {
     }
 
     /**
-     * Returns map of stream id to component id to grouper
+     * Returns map of stream id to component id to grouper.
      */
     private Map<String, Map<String, LoadAwareCustomStreamGrouping>> 
outboundComponents(
         WorkerTopologyContext workerTopologyContext, String componentId, 
Map<String, Object> topoConf) {
@@ -438,7 +434,8 @@ public abstract class Executor implements Callable, 
JCQueue.Consumer {
     // ============================ getter methods 
=================================
     // 
=============================================================================
 
-    private Map<String, Object> normalizedComponentConf(Map<String, Object> 
topoConf, WorkerTopologyContext topologyContext, String componentId) {
+    private Map<String, Object> normalizedComponentConf(
+            Map<String, Object> topoConf, WorkerTopologyContext 
topologyContext, String componentId) {
         List<String> keysToRemove = retrieveAllConfigKeys();
         keysToRemove.remove(Config.TOPOLOGY_DEBUG);
         keysToRemove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING);
@@ -503,7 +500,7 @@ public abstract class Executor implements Callable, 
JCQueue.Consumer {
         return stormId;
     }
 
-    abstract public CommonStats getStats();
+    public abstract CommonStats getStats();
 
     public String getType() {
         return type;

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
 
b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
index facbb75..f522ec6 100644
--- 
a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
+++ 
b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@@ -114,7 +114,8 @@ public class BoltOutputCollectorImpl implements 
IOutputCollector {
             } else {
                 msgId = MessageId.makeUnanchored();
             }
-            TupleImpl tupleExt = new 
TupleImpl(executor.getWorkerTopologyContext(), values, 
executor.getComponentId(), taskId, streamId, msgId);
+            TupleImpl tupleExt = new TupleImpl(
+                    executor.getWorkerTopologyContext(), values, 
executor.getComponentId(), taskId, streamId, msgId);
             xsfer.tryTransfer(new AddressedTuple(t, tupleExt), 
executor.getPendingEmits());
         }
         if (isEventLoggers) {
@@ -145,7 +146,7 @@ public class BoltOutputCollectorImpl implements 
IOutputCollector {
             boltAckInfo.applyOn(task.getUserContext());
         }
         if (delta >= 0) {
-            executor.getStats().boltAckedTuple(input.getSourceComponent(), 
input.getSourceStreamId(), delta);
+            executor.getStats().boltAckedTuple(input.getSourceComponent(), 
input.getSourceStreamId(), delta, 
task.getTaskMetrics().getAcked(input.getSourceStreamId()));
         }
     }
 
@@ -166,7 +167,7 @@ public class BoltOutputCollectorImpl implements 
IOutputCollector {
         BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
         boltFailInfo.applyOn(task.getUserContext());
         if (delta >= 0) {
-            executor.getStats().boltFailedTuple(input.getSourceComponent(), 
input.getSourceStreamId(), delta);
+            executor.getStats().boltFailedTuple(input.getSourceComponent(), 
input.getSourceStreamId(), delta, 
task.getTaskMetrics().getFailed(input.getSourceStreamId()));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java 
b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index e204150..5f16ccb 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -15,11 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.executor.spout;
 
 import com.google.common.collect.ImmutableMap;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
@@ -46,20 +51,15 @@ import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.JCQueue;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.MutableLong;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ReflectionUtils;
 import org.apache.storm.utils.RotatingMap;
 import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 public class SpoutExecutor extends Executor {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
@@ -92,7 +92,8 @@ public class SpoutExecutor extends Executor {
         this.emittedCount = new MutableLong(0);
         this.emptyEmitStreak = new MutableLong(0);
         this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
-        this.stats = new 
SpoutExecutorStats(ConfigUtils.samplingRate(this.getTopoConf()),ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
+        this.stats = new SpoutExecutorStats(
+                
ConfigUtils.samplingRate(this.getTopoConf()),ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
         this.builtInMetrics = new BuiltinSpoutMetrics(stats);
     }
 
@@ -108,13 +109,14 @@ public class SpoutExecutor extends Executor {
             Utils.sleep(100);
         }
 
-        LOG.info("Opening spout {}:{}", componentId, taskIds );
+        LOG.info("Opening spout {}:{}", componentId, taskIds);
         this.idToTask = idToTask;
         this.maxSpoutPending = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
         this.spouts = new ArrayList<>();
         for (Task task : idToTask) {
-            if(task!=null)
+            if (task != null) {
                 this.spouts.add((ISpout) task.getTaskObject());
+            }
         }
         this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback<Long, TupleInfo>() {
             @Override
@@ -130,9 +132,9 @@ public class SpoutExecutor extends Executor {
         this.spoutThrottlingMetrics.registerAll(topoConf, 
idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
         this.errorReportingMetrics.registerAll(topoConf, 
idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
         this.outputCollectors = new ArrayList<>();
-        for (int i=0; i<idToTask.size(); ++i) {
+        for (int i = 0; i < idToTask.size(); ++i) {
             Task taskData = idToTask.get(i);
-            if (taskData==null) {
+            if (taskData == null) {
                 continue;
             }
             ISpout spoutObject = (ISpout) taskData.getTaskObject();
@@ -203,12 +205,14 @@ public class SpoutExecutor extends Executor {
                     }
                 }
                 if (reachedMaxSpoutPending) {
-                    if (rmspCount == 0)
+                    if (rmspCount == 0) {
                         LOG.debug("Reached max spout pending");
+                    }
                     rmspCount++;
                 } else {
-                    if (rmspCount > 0)
+                    if (rmspCount > 0) {
                         LOG.debug("Ended max spout pending stretch of {} 
iterations", rmspCount);
+                    }
                     rmspCount = 0;
                 }
 
@@ -216,7 +220,7 @@ public class SpoutExecutor extends Executor {
                     // continue without idling
                     return 0L;
                 }
-                if ( !pendingEmits.isEmpty() ) { // then facing backpressure
+                if (!pendingEmits.isEmpty()) { // then facing backpressure
                     backPressureWaitStrategy();
                     return 0L;
                 }
@@ -344,8 +348,9 @@ public class SpoutExecutor extends Executor {
             if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid 
allocating SpoutAckInfo obj if not necessary
                 new SpoutAckInfo(tupleInfo.getMessageId(), taskId, 
timeDelta).applyOn(taskData.getUserContext());
             }
-            if (hasAckers && timeDelta!=null) {
-                executor.getStats().spoutAckedTuple(tupleInfo.getStream(), 
timeDelta);
+            if (hasAckers && timeDelta != null) {
+                executor.getStats().spoutAckedTuple(tupleInfo.getStream(), 
timeDelta,
+                        
taskData.getTaskMetrics().getAcked(tupleInfo.getStream()));
             }
         } catch (Exception e) {
             throw Utils.wrapInRuntime(e);
@@ -362,7 +367,8 @@ public class SpoutExecutor extends Executor {
             spout.fail(tupleInfo.getMessageId());
             new SpoutFailInfo(tupleInfo.getMessageId(), taskId, 
timeDelta).applyOn(taskData.getUserContext());
             if (timeDelta != null) {
-                executor.getStats().spoutFailedTuple(tupleInfo.getStream(), 
timeDelta);
+                executor.getStats().spoutFailedTuple(tupleInfo.getStream(), 
timeDelta,
+                        
taskData.getTaskMetrics().getFailed(tupleInfo.getStream()));
             }
         } catch (Exception e) {
             throw Utils.wrapInRuntime(e);
@@ -371,8 +377,9 @@ public class SpoutExecutor extends Executor {
 
 
     public int getSpoutRecvqCheckSkipCount() {
-        if(ackingEnabled)
+        if (ackingEnabled) {
             return 0; // always check recQ if ACKing enabled
+        }
         return 
ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS), 0);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java 
b/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java
new file mode 100644
index 0000000..ccd7b19
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import org.apache.storm.utils.JCQueue;
+
+public class JcMetrics {
+    private final SimpleGauge<Long> capacity;
+    private final SimpleGauge<Long> population;
+
+    JcMetrics(SimpleGauge<Long> capacity,
+              SimpleGauge<Long> population) {
+        this.capacity = capacity;
+        this.population = population;
+    }
+
+    public void setCapacity(Long capacity) {
+        this.capacity.set(capacity);
+    }
+
+    public void setPopulation(Long population) {
+        this.population.set(population);
+    }
+
+    public void set(JCQueue.QueueMetrics metrics) {
+        this.capacity.set(metrics.capacity());
+        this.population.set(metrics.population());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/SimpleGauge.java 
b/storm-client/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
new file mode 100644
index 0000000..b91dea8
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Gauge;
+
+public class SimpleGauge<T> implements Gauge<T> {
+    private T value;
+
+    public SimpleGauge(T value) {
+        this.value = value;
+    }
+
+    @Override
+    public T getValue() {
+        return this.value;
+    }
+
+    public void set(T value) {
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java 
b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
new file mode 100644
index 0000000..eba86ad
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StormMetricRegistry {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+    private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+    private static final List<StormReporter> REPORTERS = new ArrayList<>();
+
+    private static String hostName = null;
+
+    public static <T> SimpleGauge<T> gauge(
+            T initialValue, String name, String topologyId, String 
componentId, Integer taskId, Integer port) {
+        String metricName = metricName(name, topologyId, componentId, taskId, 
port);
+        if (REGISTRY.getGauges().containsKey(metricName)) {
+            return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+        } else {
+            return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+        }
+    }
+
+    public static JcMetrics jcMetrics(String name, String topologyId, String 
componentId, Integer taskId, Integer port) {
+        return new JcMetrics(
+                StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, 
componentId, taskId, port),
+                StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, taskId, port)
+        );
+    }
+
+    public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId) {
+        String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+        return REGISTRY.meter(metricName);
+    }
+
+    public static Counter counter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId) {
+        String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+        return REGISTRY.counter(metricName);
+    }
+
+    public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId) {
+        String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+        return REGISTRY.counter(metricName);
+    }
+
+    public static void start(Map<String, Object> stormConfig, DaemonType type) 
{
+        try {
+            hostName = dotToUnderScore(Utils.localHostname());
+        } catch (UnknownHostException e) {
+            LOG.warn("Unable to determine hostname while starting the metrics 
system. Hostname will be reported"
+                    + " as 'localhost'.");
+        }
+
+        LOG.info("Starting metrics reporters...");
+        List<Map<String, Object>> reporterList = (List<Map<String, 
Object>>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+        if (reporterList != null && reporterList.size() > 0) {
+            for (Map<String, Object> reporterConfig : reporterList) {
+                // only start those requested
+                List<String> daemons = (List<String>) 
reporterConfig.get("daemons");
+                for (String daemon : daemons) {
+                    if (DaemonType.valueOf(daemon.toUpperCase()) == type) {
+                        startReporter(stormConfig, reporterConfig);
+                    }
+                }
+            }
+        }
+    }
+
+    public static MetricRegistry registry() {
+        return REGISTRY;
+    }
+
+    private static void startReporter(Map<String, Object> stormConfig, 
Map<String, Object> reporterConfig) {
+        String clazz = (String)reporterConfig.get("class");
+        LOG.info("Attempting to instantiate reporter class: {}", clazz);
+        StormReporter reporter = ReflectionUtils.newInstance(clazz);
+        if (reporter != null) {
+            reporter.prepare(REGISTRY, stormConfig, reporterConfig);
+            reporter.start();
+            REPORTERS.add(reporter);
+        }
+
+    }
+
+    public static void stop() {
+        for (StormReporter sr : REPORTERS) {
+            sr.stop();
+        }
+    }
+
+    public static String metricName(String name, String stormId, String 
componentId, String streamId, Integer taskId, Integer workerPort) {
+        StringBuilder sb = new StringBuilder("storm.worker.");
+        sb.append(stormId);
+        sb.append(".");
+        sb.append(hostName);
+        sb.append(".");
+        sb.append(dotToUnderScore(componentId));
+        sb.append(".");
+        sb.append(dotToUnderScore(streamId));
+        sb.append(".");
+        sb.append(taskId);
+        sb.append(".");
+        sb.append(workerPort);
+        sb.append("-");
+        sb.append(name);
+        return sb.toString();
+    }
+
+    public static String metricName(String name, String stormId, String 
componentId, Integer taskId, Integer workerPort) {
+        StringBuilder sb = new StringBuilder("storm.worker.");
+        sb.append(stormId);
+        sb.append(".");
+        sb.append(hostName);
+        sb.append(".");
+        sb.append(dotToUnderScore(componentId));
+        sb.append(".");
+        sb.append(taskId);
+        sb.append(".");
+        sb.append(workerPort);
+        sb.append("-");
+        sb.append(name);
+        return sb.toString();
+    }
+
+    public static String metricName(String name, TopologyContext context) {
+
+
+        StringBuilder sb = new StringBuilder("storm.topology.");
+        sb.append(context.getStormId());
+        sb.append(".");
+        sb.append(hostName);
+        sb.append(".");
+        sb.append(dotToUnderScore(context.getThisComponentId()));
+        sb.append(".");
+        sb.append(context.getThisTaskId());
+        sb.append(".");
+        sb.append(context.getThisWorkerPort());
+        sb.append("-");
+        sb.append(name);
+        return sb.toString();
+    }
+
+    private static String dotToUnderScore(String str) {
+        return str.replace('.', '_');
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java 
b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
new file mode 100644
index 0000000..e796df5
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.storm.task.WorkerTopologyContext;
+
+public class TaskMetrics {
+    private static final String METRIC_NAME_ACKED = "acked";
+    private static final String METRIC_NAME_FAILED = "failed";
+    private static final String METRIC_NAME_EMITTED = "emitted";
+    private static final String METRIC_NAME_TRANSFERRED = "transferred";
+
+    private ConcurrentMap<String, Counter> ackedByStream = new 
ConcurrentHashMap<>();
+    private ConcurrentMap<String, Counter> failedByStream = new 
ConcurrentHashMap<>();
+    private ConcurrentMap<String, Counter> emittedByStream = new 
ConcurrentHashMap<>();
+    private ConcurrentMap<String, Counter> transferredByStream = new 
ConcurrentHashMap<>();
+
+    private String topologyId;
+    private String componentId;
+    private Integer taskId;
+    private Integer workerPort;
+
+    public TaskMetrics(WorkerTopologyContext context, String componentId, 
Integer taskid) {
+        this.topologyId = context.getStormId();
+        this.componentId = componentId;
+        this.taskId = taskid;
+        this.workerPort = context.getThisWorkerPort();
+    }
+
+    public Counter getAcked(String streamId) {
+        Counter c = this.ackedByStream.get(streamId);
+        if (c == null) {
+            c = StormMetricRegistry.counter(METRIC_NAME_ACKED, 
this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
+            this.ackedByStream.put(streamId, c);
+        }
+        return c;
+    }
+
+    public Counter getFailed(String streamId) {
+        Counter c = this.failedByStream.get(streamId);
+        if (c == null) {
+            c = StormMetricRegistry.counter(METRIC_NAME_FAILED, 
this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
+            this.failedByStream.put(streamId, c);
+        }
+        return c;
+    }
+
+    public Counter getEmitted(String streamId) {
+        Counter c = this.emittedByStream.get(streamId);
+        if (c == null) {
+            c = StormMetricRegistry.counter(METRIC_NAME_EMITTED, 
this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
+            this.emittedByStream.put(streamId, c);
+        }
+        return c;
+    }
+
+    public Counter getTransferred(String streamId) {
+        Counter c = this.transferredByStream.get(streamId);
+        if (c == null) {
+            c = StormMetricRegistry.counter(
+                    METRIC_NAME_TRANSFERRED, this.topologyId, 
this.componentId, this.taskId, this.workerPort, streamId);
+            this.transferredByStream.put(streamId, c);
+        }
+        return c;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java 
b/storm-client/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java
new file mode 100644
index 0000000..a209664
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metrics2.filters;
+
+import com.codahale.metrics.Metric;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class RegexFilter implements StormMetricsFilter {
+
+    private Pattern pattern;
+
+
+    @Override
+    public void prepare(Map<String, Object> config) {
+        String expression = (String) config.get("expression");
+        if (expression != null) {
+            this.pattern = Pattern.compile(expression);
+        } else {
+            throw new IllegalStateException("RegexFilter requires an 
'expression' parameter.");
+        }
+    }
+
+    @Override
+    public boolean matches(String name, Metric metric) {
+        Matcher matcher = this.pattern.matcher(name);
+        return matcher.matches();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java
 
b/storm-client/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java
new file mode 100644
index 0000000..1f876aa
--- /dev/null
+++ 
b/storm-client/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metrics2.filters;
+
+import com.codahale.metrics.MetricFilter;
+
+import java.util.Map;
+
+public interface StormMetricsFilter extends MetricFilter {
+
+    /**
+     * Called after the filter is instantiated.
+     * @param config A map of the properties from the 'filter' section of the 
reporter configuration.
+     */
+    void prepare(Map<String, Object> config);
+
+}

Reply via email to