STORM-2909 port new metrics to 2.x branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/21da5dff Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/21da5dff Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/21da5dff Branch: refs/heads/master Commit: 21da5dff6c6b12f8350096969a7082f68e2f802c Parents: a8d84d6 Author: Aaron Gresch <agre...@yahoo-inc.com> Authored: Thu Apr 5 14:42:03 2018 -0500 Committer: Aaron Gresch <agre...@yahoo-inc.com> Committed: Mon Apr 9 10:24:26 2018 -0500 ---------------------------------------------------------------------- conf/defaults.yaml | 23 +++ .../org/apache/storm/perf/JCQueuePerfTest.java | 18 +- .../apache/storm/starter/AnchoredWordCount.java | 139 ++++++++++++++ pom.xml | 10 ++ storm-client/pom.xml | 13 ++ .../src/jvm/org/apache/storm/Config.java | 102 +++++++---- .../src/jvm/org/apache/storm/daemon/Task.java | 20 ++- .../daemon/metrics/ClientMetricsUtils.java | 52 ++++++ .../daemon/worker/BackPressureTracker.java | 12 +- .../org/apache/storm/daemon/worker/Worker.java | 18 +- .../apache/storm/daemon/worker/WorkerState.java | 29 +-- .../storm/daemon/worker/WorkerTransfer.java | 15 +- .../jvm/org/apache/storm/executor/Executor.java | 19 +- .../executor/bolt/BoltOutputCollectorImpl.java | 7 +- .../storm/executor/spout/SpoutExecutor.java | 43 +++-- .../org/apache/storm/metrics2/JcMetrics.java | 45 +++++ .../org/apache/storm/metrics2/SimpleGauge.java | 38 ++++ .../storm/metrics2/StormMetricRegistry.java | 180 +++++++++++++++++++ .../org/apache/storm/metrics2/TaskMetrics.java | 85 +++++++++ .../storm/metrics2/filters/RegexFilter.java | 48 +++++ .../metrics2/filters/StormMetricsFilter.java | 33 ++++ .../reporters/ConsoleStormReporter.java | 69 +++++++ .../metrics2/reporters/CsvStormReporter.java | 97 ++++++++++ .../reporters/GangliaStormReporter.java | 132 ++++++++++++++ .../reporters/GraphiteStormReporter.java | 102 +++++++++++ .../metrics2/reporters/JmxStormReporter.java | 92 ++++++++++ .../reporters/ScheduledStormReporter.java | 86 +++++++++ .../storm/metrics2/reporters/StormReporter.java | 35 ++++ .../apache/storm/stats/BoltExecutorStats.java | 32 +--- .../jvm/org/apache/storm/stats/CommonStats.java | 23 ++- .../apache/storm/stats/SpoutExecutorStats.java | 26 +-- .../org/apache/storm/task/TopologyContext.java | 74 +++++--- .../src/jvm/org/apache/storm/utils/JCQueue.java | 58 ++++-- .../storm/validation/ConfigValidation.java | 167 +++++++++++------ .../storm/utils/JCQueueBackpressureTest.java | 2 +- .../jvm/org/apache/storm/utils/JCQueueTest.java | 4 +- .../java/org/apache/storm/DaemonConfig.java | 41 ++--- .../storm/daemon/metrics/MetricsUtils.java | 39 +--- .../reporters/ConsolePreparableReporter.java | 16 +- .../reporters/CsvPreparableReporter.java | 17 +- .../reporters/JmxPreparableReporter.java | 12 +- .../storm/nimbus/DefaultTopologyValidator.java | 38 +++- .../storm/nimbus/StrictTopologyValidator.java | 67 +++++++ 43 files changed, 1830 insertions(+), 348 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 8dcb2d9..c985c12 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -388,3 +388,26 @@ worker.metrics: # The number of buckets for running statistics num.stat.buckets: 20 + +# Metrics v2 configuration (optional) +#storm.metrics.reporters: +# # Graphite Reporter +# - class: "org.apache.storm.metrics2.reporters.GraphiteStormReporter" +# daemons: +# - "supervisor" +# - "nimbus" +# - "worker" +# report.period: 60 +# report.period.units: "SECONDS" +# graphite.host: "localhost" +# graphite.port: 2003 +# +# # Console Reporter +# - class: "org.apache.storm.metrics2.reporters.ConsoleStormReporter" +# daemons: +# - "worker" +# report.period: 10 +# report.period.units: "SECONDS" +# filter: +# class: "org.apache.storm.metrics2.filters.RegexFilter" +# expression: ".*my_component.*emitted.*" http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java index 17c676f..700ce4e 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java @@ -46,8 +46,8 @@ public class JCQueuePerfTest { private static void ackingProducerSimulation() { WaitStrategyPark ws = new WaitStrategyPark(100); - JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws); - JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws); + JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws, "test", "test",1000, 1000); + JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws, "test", "test",1000, 1000); final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ); final Acker acker = new Acker(ackQ, spoutQ); @@ -57,8 +57,8 @@ public class JCQueuePerfTest { private static void producerFwdConsumer(int prodBatchSz) { WaitStrategyPark ws = new WaitStrategyPark(100); - JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws); - JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws); + JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test",1000, 1000); + JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test",1000, 1000); final Producer prod = new Producer(q1); final Forwarder fwd = new Forwarder(q1, q2); @@ -69,7 +69,7 @@ public class JCQueuePerfTest { private static void oneProducer1Consumer(int prodBatchSz) { - JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100)); + JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",1000, 1000); final Producer prod1 = new Producer(q1); final Consumer cons1 = new Consumer(q1); @@ -78,7 +78,7 @@ public class JCQueuePerfTest { } private static void twoProducer1Consumer(int prodBatchSz) { - JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100)); + JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",1000, 1000); final Producer prod1 = new Producer(q1); final Producer prod2 = new Producer(q1); @@ -88,7 +88,7 @@ public class JCQueuePerfTest { } private static void threeProducer1Consumer(int prodBatchSz) { - JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100)); + JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",1000, 1000); final Producer prod1 = new Producer(q1); final Producer prod2 = new Producer(q1); @@ -101,8 +101,8 @@ public class JCQueuePerfTest { private static void oneProducer2Consumers(int prodBatchSz) { WaitStrategyPark ws = new WaitStrategyPark(100); - JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws); - JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws); + JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test",1000, 1000); + JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test",1000, 1000); final Producer2 prod1 = new Producer2(q1, q2); final Consumer cons1 = new Consumer(q1); http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java new file mode 100644 index 0000000..cb45024 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.starter; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +import org.apache.storm.Config; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.ConfigurableTopology; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +public class AnchoredWordCount extends ConfigurableTopology { + + public static class RandomSentenceSpout extends BaseRichSpout { + SpoutOutputCollector collector; + Random random; + + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; + this.random = new Random(); + } + + @Override + public void nextTuple() { + Utils.sleep(10); + String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"), + sentence("four score and seven years ago"), + sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")}; + final String sentence = sentences[random.nextInt(sentences.length)]; + + this.collector.emit(new Values(sentence), UUID.randomUUID()); + } + + protected String sentence(String input) { + return input; + } + + @Override + public void ack(Object id) { + } + + @Override + public void fail(Object id) { + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + } + + + public static class SplitSentence extends BaseBasicBolt { + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String sentence = tuple.getString(0); + for (String word: sentence.split("\\s+")) { + collector.emit(new Values(word, 1)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + public static class WordCount extends BaseBasicBolt { + Map<String, Integer> counts = new HashMap<>(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String word = tuple.getString(0); + Integer count = counts.get(word); + if (count == null) { + count = 0; + } + count++; + counts.put(word, count); + collector.emit(new Values(word, count)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + protected int run(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout", new RandomSentenceSpout(), 4); + + builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout"); + builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word")); + + Config conf = new Config(); + conf.setMaxTaskParallelism(3); + + String topologyName = "word-count"; + + conf.setNumWorkers(3); + + if (args != null && args.length > 0) { + topologyName = args[0]; + } + return submit(topologyName, conf, builder); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d3204b2..77c7738 100644 --- a/pom.xml +++ b/pom.xml @@ -956,6 +956,16 @@ <version>${metrics.version}</version> </dependency> <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-ganglia</artifactId> + <version>${metrics.version}</version> + </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-graphite</artifactId> + <version>${metrics.version}</version> + </dependency> + <dependency> <groupId>metrics-clojure</groupId> <artifactId>metrics-clojure</artifactId> <version>${metrics-clojure.version}</version> http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/pom.xml ---------------------------------------------------------------------- diff --git a/storm-client/pom.xml b/storm-client/pom.xml index 2e073a8..1311d2a 100644 --- a/storm-client/pom.xml +++ b/storm-client/pom.xml @@ -184,6 +184,19 @@ <artifactId>curator-client</artifactId> </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-ganglia</artifactId> + </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-graphite</artifactId> + </dependency> + <!-- end of transitive dependency management --> <!-- test --> http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index b275985..3ebc493 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -18,20 +18,19 @@ package org.apache.storm; +import com.esotericsoftware.kryo.Serializer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.storm.metric.IEventLogger; import org.apache.storm.policy.IWaitStrategy; import org.apache.storm.serialization.IKryoDecorator; import org.apache.storm.serialization.IKryoFactory; import org.apache.storm.validation.ConfigValidation; -import org.apache.storm.validation.ConfigValidationAnnotations.*; import org.apache.storm.validation.ConfigValidation.*; -import com.esotericsoftware.kryo.Serializer; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import org.apache.storm.validation.ConfigValidationAnnotations.*; /** * Topology configs are specified as a plain old map. This class provides a @@ -148,7 +147,7 @@ public class Config extends HashMap<String, Object> { * nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer */ @isStringList - public static final String TOPOLOGY_READONLY_USERS="topology.readonly.users"; + public static final String TOPOLOGY_READONLY_USERS = "topology.readonly.users"; /** * A list of readonly groups that are allowed to interact with the topology. To use this set @@ -171,7 +170,7 @@ public class Config extends HashMap<String, Object> { public static final String TOPOLOGY_DEBUG = "topology.debug"; /** - * User defined version of this topology + * User defined version of this topology. */ @isString public static final String TOPOLOGY_VERSION = "topology.version"; @@ -186,7 +185,7 @@ public class Config extends HashMap<String, Object> { /** * The serializer for communication between shell components and non-JVM - * processes + * processes. */ @isString public static final String TOPOLOGY_MULTILANG_SERIALIZER = "topology.multilang.serializer"; @@ -268,14 +267,16 @@ public class Config extends HashMap<String, Object> { * to allocate slots on machines with enough available memory. A default value will be set for this config if user does not override */ @isPositiveNumber(includeZero = true) - public static final String TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB = "topology.metrics.consumer.resources.onheap.memory.mb"; + public static final String TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB = + "topology.metrics.consumer.resources.onheap.memory.mb"; /** * The maximum amount of memory an instance of a metrics consumer will take off heap. This enables the scheduler * to allocate slots on machines with enough available memory. A default value will be set for this config if user does not override */ @isPositiveNumber(includeZero = true) - public static final String TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB = "topology.metrics.consumer.resources.offheap.memory.mb"; + public static final String TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB = + "topology.metrics.consumer.resources.offheap.memory.mb"; /** * The config indicates the percentage of cpu for a core an instance(executor) of a metrics consumer will use. @@ -311,13 +312,13 @@ public class Config extends HashMap<String, Object> { public static final String TOPOLOGY_STATE_CHECKPOINT_INTERVAL = "topology.state.checkpoint.interval.ms"; /** - * A per topology config that specifies the maximum amount of memory a worker can use for that specific topology + * A per topology config that specifies the maximum amount of memory a worker can use for that specific topology. */ @isPositiveNumber public static final String TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB = "topology.worker.max.heap.size.mb"; /** - * The strategy to use when scheduling a topology with Resource Aware Scheduler + * The strategy to use when scheduling a topology with Resource Aware Scheduler. */ @NotNull @isString @@ -388,7 +389,7 @@ public class Config extends HashMap<String, Object> { * Note that EventLoggerBolt takes care of all the implementations of IEventLogger, hence registering * many implementations (especially they're implemented as 'blocking' manner) would slow down overall topology. */ - @isListEntryCustom(entryValidatorClasses={EventLoggerRegistryValidator.class}) + @isListEntryCustom(entryValidatorClasses = {EventLoggerRegistryValidator.class}) public static final String TOPOLOGY_EVENT_LOGGER_REGISTER = "topology.event.logger.register"; /** @@ -452,10 +453,10 @@ public class Config extends HashMap<String, Object> { * rather than throw an error. */ @isBoolean - public static final String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS= "topology.skip.missing.kryo.registrations"; + public static final String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS = "topology.skip.missing.kryo.registrations"; /** - * List of classes to register during state serialization + * List of classes to register during state serialization. */ @isStringList public static final String TOPOLOGY_STATE_KRYO_REGISTER = "topology.state.kryo.register"; @@ -466,7 +467,7 @@ public class Config extends HashMap<String, Object> { * Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable. */ - @isListEntryCustom(entryValidatorClasses={MetricRegistryValidator.class}) + @isListEntryCustom(entryValidatorClasses = {MetricRegistryValidator.class}) public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register"; /** @@ -477,13 +478,13 @@ public class Config extends HashMap<String, Object> { public static final String TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS = "topology.serialized.message.size.metrics"; /** - * A map of metric name to class name implementing IMetric that will be created once per worker JVM + * A map of metric name to class name implementing IMetric that will be created once per worker JVM. */ @isMapEntryType(keyType = String.class, valueType = String.class) public static final String TOPOLOGY_WORKER_METRICS = "topology.worker.metrics"; /** - * A map of metric name to class name implementing IMetric that will be created once per worker JVM + * A map of metric name to class name implementing IMetric that will be created once per worker JVM. */ @isMapEntryType(keyType = String.class, valueType = String.class) public static final String WORKER_METRICS = "worker.metrics"; @@ -494,7 +495,7 @@ public class Config extends HashMap<String, Object> { */ @isInteger @isPositiveNumber - public static final String TOPOLOGY_MAX_TASK_PARALLELISM="topology.max.task.parallelism"; + public static final String TOPOLOGY_MAX_TASK_PARALLELISM = "topology.max.task.parallelism"; /** * The maximum number of tuples that can be pending on a spout task at any given time. @@ -506,14 +507,14 @@ public class Config extends HashMap<String, Object> { */ @isInteger @isPositiveNumber - public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending"; + public static final String TOPOLOGY_MAX_SPOUT_PENDING = "topology.max.spout.pending"; /** * The amount of milliseconds the SleepEmptyEmitStrategy should sleep for. */ @isInteger @isPositiveNumber(includeZero = true) - public static final String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS="topology.sleep.spout.wait.strategy.time.ms"; + public static final String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS = "topology.sleep.spout.wait.strategy.time.ms"; /** * The maximum amount of time a component gives a source of state to synchronize before it requests @@ -522,49 +523,49 @@ public class Config extends HashMap<String, Object> { @isInteger @isPositiveNumber @NotNull - public static final String TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS="topology.state.synchronization.timeout.secs"; + public static final String TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS = "topology.state.synchronization.timeout.secs"; /** * The percentage of tuples to sample to produce stats for a task. */ @isPositiveNumber - public static final String TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate"; + public static final String TOPOLOGY_STATS_SAMPLE_RATE = "topology.stats.sample.rate"; /** * The time period that builtin metrics data in bucketed into. */ @isInteger - public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs"; + public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS = "topology.builtin.metrics.bucket.size.secs"; /** * Whether or not to use Java serialization in a topology. */ @isBoolean - public static final String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION="topology.fall.back.on.java.serialization"; + public static final String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION = "topology.fall.back.on.java.serialization"; /** * Topology-specific options for the worker child process. This is used in addition to WORKER_CHILDOPTS. */ @isStringOrStringList - public static final String TOPOLOGY_WORKER_CHILDOPTS="topology.worker.childopts"; + public static final String TOPOLOGY_WORKER_CHILDOPTS = "topology.worker.childopts"; /** * Topology-specific options GC for the worker child process. This overrides WORKER_GC_CHILDOPTS. */ @isStringOrStringList - public static final String TOPOLOGY_WORKER_GC_CHILDOPTS="topology.worker.gc.childopts"; + public static final String TOPOLOGY_WORKER_GC_CHILDOPTS = "topology.worker.gc.childopts"; /** * Topology-specific options for the logwriter process of a worker. */ @isStringOrStringList - public static final String TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS="topology.worker.logwriter.childopts"; + public static final String TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS = "topology.worker.logwriter.childopts"; /** * Topology-specific classpath for the worker child process. This is combined to the usual classpath. */ @isStringOrStringList - public static final String TOPOLOGY_CLASSPATH="topology.classpath"; + public static final String TOPOLOGY_CLASSPATH = "topology.classpath"; /** * Topology-specific classpath for the worker child process. This will be *prepended* to @@ -573,14 +574,14 @@ public class Config extends HashMap<String, Object> { * classpaths, set the storm.topology.classpath.beginning.enabled config to true. */ @isStringOrStringList - public static final String TOPOLOGY_CLASSPATH_BEGINNING="topology.classpath.beginning"; + public static final String TOPOLOGY_CLASSPATH_BEGINNING = "topology.classpath.beginning"; /** * Topology-specific environment variables for the worker child process. * This is added to the existing environment (that of the supervisor) */ @isMapEntryType(keyType = String.class, valueType = String.class) - public static final String TOPOLOGY_ENVIRONMENT="topology.environment"; + public static final String TOPOLOGY_ENVIRONMENT = "topology.environment"; /* * Bolt-specific configuration for windowed bolts to specify the window length as a count of number of tuples @@ -651,7 +652,7 @@ public class Config extends HashMap<String, Object> { * topology in Zookeeper. */ @isString - public static final String TOPOLOGY_TRANSACTIONAL_ID="topology.transactional.id"; + public static final String TOPOLOGY_TRANSACTIONAL_ID = "topology.transactional.id"; /** * A list of task hooks that are automatically added to every spout and bolt in the topology. An example @@ -659,21 +660,21 @@ public class Config extends HashMap<String, Object> { * monitoring system. These hooks are instantiated using the zero-arg constructor. */ @isStringList - public static final String TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks"; + public static final String TOPOLOGY_AUTO_TASK_HOOKS = "topology.auto.task.hooks"; /** * The size of the receive queue for each executor. */ @isPositiveNumber @isInteger - public static final String TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE="topology.executor.receive.buffer.size"; + public static final String TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE = "topology.executor.receive.buffer.size"; /** * The size of the transfer queue for each worker. */ @isPositiveNumber @isInteger - public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE="topology.transfer.buffer.size"; + public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE = "topology.transfer.buffer.size"; /** * The size of the transfer queue for each worker. @@ -1240,6 +1241,9 @@ public class Config extends HashMap<String, Object> { @isString public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate"; + @isListEntryCustom(entryValidatorClasses={MetricReportersValidator.class}) + public static final String STORM_METRICS_REPORTERS = "storm.metrics.reporters"; + /** * What blobstore implementation the storm client should use. */ @@ -1672,7 +1676,8 @@ public class Config extends HashMap<String, Object> { /** * Impersonation user ACL config entries. */ - @isMapEntryCustom(keyValidatorClasses = {ConfigValidation.StringValidator.class}, valueValidatorClasses = {ConfigValidation.ImpersonationAclUserEntryValidator.class}) + @isMapEntryCustom(keyValidatorClasses = {ConfigValidation.StringValidator.class}, + valueValidatorClasses = {ConfigValidation.ImpersonationAclUserEntryValidator.class}) public static final String NIMBUS_IMPERSONATION_ACL = "nimbus.impersonation.acl"; /** @@ -1913,6 +1918,27 @@ public class Config extends HashMap<String, Object> { @isPositiveNumber public static final String WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS = "worker.blob.update.poll.interval.secs"; + /** + * A specify Locale for daemon metrics reporter plugin. + * Use the specified IETF BCP 47 language tag string for a Locale. + */ + @isString + public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE = "storm.daemon.metrics.reporter.plugin.locale"; + + /** + * A specify rate-unit in TimeUnit to specify reporting frequency for daemon metrics reporter plugin. + */ + @isString + public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT = "storm.daemon.metrics.reporter.plugin.rate.unit"; + + /** + * A specify duration-unit in TimeUnit to specify reporting window for daemon metrics reporter plugin. + */ + @isString + public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT = "storm.daemon.metrics.reporter.plugin.duration.unit"; + + + public static void setClasspath(Map<String, Object> conf, String cp) { conf.put(Config.TOPOLOGY_CLASSPATH, cp); } http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/daemon/Task.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java index ce9d0e4..4bc0e25 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java @@ -29,7 +29,6 @@ import java.util.Queue; import java.util.Random; import java.util.function.BooleanSupplier; - import org.apache.storm.Config; import org.apache.storm.Thrift; import org.apache.storm.daemon.worker.WorkerState; @@ -46,6 +45,7 @@ import org.apache.storm.generated.StormTopology; import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; import org.apache.storm.hooks.ITaskHook; import org.apache.storm.hooks.info.EmitInfo; +import org.apache.storm.metrics2.TaskMetrics; import org.apache.storm.spout.ShellSpout; import org.apache.storm.stats.CommonStats; import org.apache.storm.task.ShellBolt; @@ -78,6 +78,7 @@ public class Task { private Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper; private HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> streamToGroupers; private boolean debug; + private final TaskMetrics taskMetrics; public Task(Executor executor, Integer taskId) throws IOException { this.taskId = taskId; @@ -95,6 +96,7 @@ public class Task { this.taskObject = mkTaskObject(); this.debug = topoConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) topoConf.get(Config.TOPOLOGY_DEBUG); this.addTaskHooks(); + this.taskMetrics = new TaskMetrics(this.workerTopologyContext, this.componentId, this.taskId); } public List<Integer> getOutgoingTasks(Integer outTaskId, String stream, List<Object> values) { @@ -116,9 +118,9 @@ public class Task { try { if (emitSampler.getAsBoolean()) { - executorStats.emittedTuple(stream); + executorStats.emittedTuple(stream, this.taskMetrics.getEmitted(stream)); if (null != outTaskId) { - executorStats.transferredTuples(stream, 1); + executorStats.transferredTuples(stream, 1, this.taskMetrics.getTransferred(stream)); } } } catch (Exception e) { @@ -140,7 +142,7 @@ public class Task { ArrayList<LoadAwareCustomStreamGrouping> groupers = streamToGroupers.get(stream); if (null != groupers) { - for (int i=0; i<groupers.size(); ++i) { + for (int i = 0; i < groupers.size(); ++i) { LoadAwareCustomStreamGrouping grouper = groupers.get(i); if (grouper == GrouperFactory.DIRECT) { throw new IllegalArgumentException("Cannot do regular emit to direct stream"); @@ -157,8 +159,8 @@ public class Task { } try { if (emitSampler.getAsBoolean()) { - executorStats.emittedTuple(stream); - executorStats.transferredTuples(stream, outTasks.size()); + executorStats.emittedTuple(stream, this.taskMetrics.getEmitted(stream)); + executorStats.transferredTuples(stream, outTasks.size(), this.taskMetrics.getTransferred(stream)); } } catch (Exception e) { throw new RuntimeException(e); @@ -186,6 +188,9 @@ public class Task { return taskObject; } + public TaskMetrics getTaskMetrics() { + return taskMetrics; + } // Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument public void sendUnanchored(String stream, List<Object> values, ExecutorTransfer transfer, Queue<AddressedTuple> pendingEmits) { @@ -284,7 +289,8 @@ public class Task { } } - private static HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> getGroupersPerStream(Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper) { + private static HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> getGroupersPerStream( + Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper) { HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> result = new HashMap<>(streamComponentToGrouper.size()); for (Entry<String, Map<String, LoadAwareCustomStreamGrouping>> entry : streamComponentToGrouper.entrySet()) { http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/daemon/metrics/ClientMetricsUtils.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/ClientMetricsUtils.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/ClientMetricsUtils.java new file mode 100644 index 0000000..3fbdb7e --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/ClientMetricsUtils.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.metrics; + +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.Config; +import org.apache.storm.utils.ObjectReader; + +public class ClientMetricsUtils { + + public static TimeUnit getMetricsRateUnit(Map<String, Object> topoConf) { + return getTimeUnitForCofig(topoConf, Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT); + } + + public static TimeUnit getMetricsDurationUnit(Map<String, Object> topoConf) { + return getTimeUnitForCofig(topoConf, Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT); + } + + public static Locale getMetricsReporterLocale(Map<String, Object> topoConf) { + String languageTag = ObjectReader.getString(topoConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE), null); + if (languageTag != null) { + return Locale.forLanguageTag(languageTag); + } + return null; + } + + private static TimeUnit getTimeUnitForCofig(Map<String, Object> topoConf, String configName) { + String rateUnitString = ObjectReader.getString(topoConf.get(configName), null); + if (rateUnitString != null) { + return TimeUnit.valueOf(rateUnitString); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java index 9b31c1a..6f4be8e 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java @@ -19,11 +19,12 @@ package org.apache.storm.daemon.worker; import java.util.ArrayList; -import java.util.concurrent.ConcurrentHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.storm.Constants; import org.apache.storm.messaging.netty.BackPressureStatus; import org.apache.storm.utils.JCQueue; import org.slf4j.Logger; @@ -37,7 +38,8 @@ import static org.apache.storm.Constants.SYSTEM_TASK_ID; * ConcurrentHashMap does not allow storing null values, so we use the special value NONE instead. */ public class BackPressureTracker { - private static final JCQueue NONE = new JCQueue ("NoneQ", 2, 0, 1, null) { }; + private static final JCQueue NONE = new JCQueue("NoneQ", 2, 0, 1, null, + "none", Constants.SYSTEM_COMPONENT_ID, -1, 0) { }; static final Logger LOG = LoggerFactory.getLogger(BackPressureTracker.class); @@ -47,7 +49,7 @@ public class BackPressureTracker { public BackPressureTracker(String workerId, List<Integer> allLocalTasks) { this.workerId = workerId; for (Integer taskId : allLocalTasks) { - if(taskId != SYSTEM_TASK_ID) { + if (taskId != SYSTEM_TASK_ID) { tasks.put(taskId, NONE); // all tasks are considered to be not under BP initially } } @@ -58,7 +60,7 @@ public class BackPressureTracker { } /*** - * Record BP for a task + * Record BP for a task. * This is called by transferLocalBatch() on NettyWorker thread * @return true if an update was recorded, false if taskId is already under BP */ @@ -70,7 +72,7 @@ public class BackPressureTracker { public boolean refreshBpTaskList() { boolean changed = false; LOG.debug("Running Back Pressure status change check"); - for ( Entry<Integer, JCQueue> entry : tasks.entrySet()) { + for (Entry<Integer, JCQueue> entry : tasks.entrySet()) { if (entry.getValue() != NONE && entry.getValue().isEmptyOverflow()) { recordNoBackPressure(entry.getKey()); changed = true; http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java index 65915d0..9867cb4 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java @@ -63,6 +63,7 @@ import org.apache.storm.generated.LogConfig; import org.apache.storm.generated.SupervisorWorkerHeartbeat; import org.apache.storm.messaging.IConnection; import org.apache.storm.messaging.IContext; +import org.apache.storm.metrics2.StormMetricRegistry; import org.apache.storm.security.auth.AuthUtils; import org.apache.storm.security.auth.IAutoCredentials; import org.apache.storm.stats.StatsUtil; @@ -73,11 +74,9 @@ import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.SupervisorClient; import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; -import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J; public class Worker implements Shutdownable, DaemonCommon { @@ -149,6 +148,8 @@ public class Worker implements Shutdownable, DaemonCommon { IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext); IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext); + StormMetricRegistry.start(conf, DaemonType.WORKER); + Credentials initialCredentials = stormClusterState.credentials(topologyId, null); Map<String, String> initCreds = new HashMap<>(); if (initialCredentials != null) { @@ -284,7 +285,8 @@ public class Worker implements Shutdownable, DaemonCommon { final Integer xferBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE)); final Long flushIntervalMillis = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS)); if ((producerBatchSize == 1 && xferBatchSize == 1) || flushIntervalMillis == 0) { - LOG.info("Flush Tuple generation disabled. producerBatchSize={}, xferBatchSize={}, flushIntervalMillis={}", producerBatchSize, xferBatchSize, flushIntervalMillis); + LOG.info("Flush Tuple generation disabled. producerBatchSize={}, xferBatchSize={}, flushIntervalMillis={}", + producerBatchSize, xferBatchSize, flushIntervalMillis); return; } @@ -309,8 +311,8 @@ public class Worker implements Shutdownable, DaemonCommon { return; } final Long bpCheckIntervalMs = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_CHECK_MILLIS)); - workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs - , bpCheckIntervalMs, () -> workerState.refreshBackPressureStatus()); + workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs, + bpCheckIntervalMs, () -> workerState.refreshBackPressureStatus()); LOG.info("BackPressure status change checking will be performed every {} millis", bpCheckIntervalMs); } @@ -416,12 +418,12 @@ public class Worker implements Shutdownable, DaemonCommon { //In distributed mode, send heartbeat directly to master if local supervisor goes down. SupervisorWorkerHeartbeat workerHeartbeat = new SupervisorWorkerHeartbeat(lsWorkerHeartbeat.get_topology_id(), lsWorkerHeartbeat.get_executors(), lsWorkerHeartbeat.get_time_secs()); - try (SupervisorClient client = SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPort)){ + try (SupervisorClient client = SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPort)) { client.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat); } catch (Exception tr1) { //If any error/exception thrown, report directly to nimbus. LOG.warn("Exception when send heartbeat to local supervisor", tr1.getMessage()); - try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf)){ + try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf)) { nimbusClient.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat); } catch (Exception tr2) { //if any error/exception thrown, just ignore. @@ -473,6 +475,8 @@ public class Worker implements Shutdownable, DaemonCommon { workerState.backPressureCheckTimer.close(); workerState.closeResources(); + StormMetricRegistry.stop(); + LOG.info("Trigger any worker shutdown hooks"); workerState.runWorkerShutdownHooks(); http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java index 8d8c62f..fc94086 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java @@ -45,7 +45,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.storm.Config; import org.apache.storm.Constants; import org.apache.storm.StormTimer; -import org.apache.storm.messaging.netty.BackPressureStatus; import org.apache.storm.cluster.IStateStorage; import org.apache.storm.cluster.IStormClusterState; import org.apache.storm.cluster.VersionedData; @@ -69,9 +68,9 @@ import org.apache.storm.messaging.DeserializingConnectionCallback; import org.apache.storm.messaging.IConnection; import org.apache.storm.messaging.IContext; import org.apache.storm.messaging.TransportFactory; -import org.apache.storm.security.auth.IAutoCredentials; - +import org.apache.storm.messaging.netty.BackPressureStatus; import org.apache.storm.policy.IWaitStrategy; +import org.apache.storm.security.auth.IAutoCredentials; import org.apache.storm.serialization.ITupleSerializer; import org.apache.storm.serialization.KryoTupleSerializer; import org.apache.storm.task.WorkerTopologyContext; @@ -79,10 +78,10 @@ import org.apache.storm.tuple.AddressedTuple; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.JCQueue; -import org.apache.storm.utils.Utils; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.SupervisorClient; import org.apache.storm.utils.ThriftTopologyUtils; +import org.apache.storm.utils.Utils; import org.apache.storm.utils.Utils.SmartThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -217,7 +216,9 @@ public class WorkerState { // local executors and localTaskIds running in this worker final Set<List<Long>> localExecutors; final ArrayList<Integer> localTaskIds; - final Map<Integer, JCQueue> localReceiveQueues = new HashMap<>(); // [taskId]-> JCQueue : initialized after local executors are initialized + + // [taskId]-> JCQueue : initialized after local executors are initialized + final Map<Integer, JCQueue> localReceiveQueues = new HashMap<>(); final Map<String, Object> topologyConf; final StormTopology topology; @@ -418,9 +419,8 @@ public class WorkerState { public void refreshStormActive(Runnable callback) { StormBase base = stormClusterState.stormBase(topologyId, callback); isTopologyActive.set( - (null != base) && - (base.get_status() == TopologyStatus.ACTIVE) && - (isWorkerActive.get())); + (null != base) + && (base.get_status() == TopologyStatus.ACTIVE) && (isWorkerActive.get())); if (null != base) { Map<String, DebugOptions> debugOptionsMap = new HashMap<>(base.get_component_debug()); for (DebugOptions debugOptions : debugOptionsMap.values()) { @@ -559,7 +559,8 @@ public class WorkerState { private void dropMessage(AddressedTuple tuple, JCQueue queue) { ++dropCount; queue.recordMsgDrop(); - LOG.warn("Dropping message as overflow threshold has reached for Q = {}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}", queue.getName(), queue.getOverflowCount(), dropCount, tuple); + LOG.warn("Dropping message as overflow threshold has reached for Q = {}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}", + queue.getName(), queue.getOverflowCount(), dropCount, tuple); } public void checkSerialize(KryoTupleSerializer serializer, AddressedTuple tuple) { @@ -643,7 +644,7 @@ public class WorkerState { private Assignment getLocalAssignment(Map<String, Object> conf, IStormClusterState stormClusterState, String topologyId) { if (!ConfigUtils.isLocalMode(conf)) { try (SupervisorClient supervisorClient = SupervisorClient.getConfiguredClient(conf, Utils.hostname(), - supervisorPort)){ + supervisorPort)) { Assignment assignment = supervisorClient.getClient().getLocalAssignmentForStorm(topologyId); return assignment; } catch (Throwable tr1) { @@ -661,15 +662,17 @@ public class WorkerState { Integer overflowLimit = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_OVERFLOW_LIMIT)); if (recvBatchSize > recvQueueSize / 2) { - throw new IllegalArgumentException(Config.TOPOLOGY_PRODUCER_BATCH_SIZE + ":" + recvBatchSize + - " is greater than half of " + Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE + ":" + recvQueueSize); + throw new IllegalArgumentException(Config.TOPOLOGY_PRODUCER_BATCH_SIZE + ":" + recvBatchSize + + " is greater than half of " + Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE + ":" + recvQueueSize); } IWaitStrategy backPressureWaitStrategy = IWaitStrategy.createBackPressureWaitStrategy(topologyConf); Map<List<Long>, JCQueue> receiveQueueMap = new HashMap<>(); for (List<Long> executor : executors) { + int port = this.getPort(); receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(), - recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy)); + recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy, + this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, -1, this.getPort())); } return receiveQueueMap; http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java index e57123c..8923bf5 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java @@ -18,7 +18,12 @@ package org.apache.storm.daemon.worker; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.storm.Config; +import org.apache.storm.Constants; import org.apache.storm.messaging.TaskMessage; import org.apache.storm.policy.IWaitStrategy; import org.apache.storm.serialization.ITupleSerializer; @@ -31,11 +36,6 @@ import org.apache.storm.utils.Utils.SmartThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantReadWriteLock; - // Transfers messages destined to other workers class WorkerTransfer implements JCQueue.Consumer { static final Logger LOG = LoggerFactory.getLogger(WorkerTransfer.class); @@ -53,7 +53,7 @@ class WorkerTransfer implements JCQueue.Consumer { this.workerState = workerState; this.backPressureWaitStrategy = IWaitStrategy.createBackPressureWaitStrategy(topologyConf); this.drainer = new TransferDrainer(); - this.remoteBackPressureStatus = new AtomicBoolean[maxTaskIdInTopo+1]; + this.remoteBackPressureStatus = new AtomicBoolean[maxTaskIdInTopo + 1]; for (int i = 0; i < remoteBackPressureStatus.length; i++) { remoteBackPressureStatus[i] = new AtomicBoolean(false); } @@ -65,7 +65,8 @@ class WorkerTransfer implements JCQueue.Consumer { + Config.TOPOLOGY_TRANSFER_BUFFER_SIZE + ":" + xferQueueSz); } - this.transferQueue = new JCQueue("worker-transfer-queue", xferQueueSz, 0, xferBatchSz, backPressureWaitStrategy); + this.transferQueue = new JCQueue("worker-transfer-queue", xferQueueSz, 0, xferBatchSz, backPressureWaitStrategy, + workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, -1, workerState.getPort()); } public JCQueue getTransferQueue() { http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/executor/Executor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java index f811b82..962e019 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java @@ -24,7 +24,6 @@ import com.google.common.collect.Lists; import java.io.IOException; import java.lang.reflect.Field; import java.net.UnknownHostException; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -33,15 +32,15 @@ import java.util.Map; import java.util.Objects; import java.util.Queue; import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.Callable; import java.util.function.BooleanSupplier; import java.util.stream.Collectors; - import org.apache.storm.Config; import org.apache.storm.Constants; +import org.apache.storm.StormTimer; import org.apache.storm.cluster.ClusterStateContext; import org.apache.storm.cluster.ClusterUtils; import org.apache.storm.cluster.DaemonType; @@ -66,11 +65,8 @@ import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; import org.apache.storm.grouping.LoadMapping; import org.apache.storm.metric.api.IMetric; import org.apache.storm.metric.api.IMetricsConsumer; -import org.apache.storm.stats.BoltExecutorStats; import org.apache.storm.stats.CommonStats; -import org.apache.storm.stats.SpoutExecutorStats; import org.apache.storm.stats.StatsUtil; -import org.apache.storm.StormTimer; import org.apache.storm.task.WorkerTopologyContext; import org.apache.storm.tuple.AddressedTuple; import org.apache.storm.tuple.Fields; @@ -78,9 +74,9 @@ import org.apache.storm.tuple.TupleImpl; import org.apache.storm.tuple.Values; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.JCQueue; -import org.apache.storm.utils.Utils; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; import org.jctools.queues.MpscChunkedArrayQueue; import org.json.simple.JSONValue; import org.json.simple.parser.ParseException; @@ -254,7 +250,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer { } /** - * separated from mkExecutor in order to replace executor transfer in executor data for testing + * separated from mkExecutor in order to replace executor transfer in executor data for testing. */ public ExecutorShutdown execute() throws Exception { LOG.info("Loading executor tasks " + componentId + ":" + executorId); @@ -400,7 +396,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer { } /** - * Returns map of stream id to component id to grouper + * Returns map of stream id to component id to grouper. */ private Map<String, Map<String, LoadAwareCustomStreamGrouping>> outboundComponents( WorkerTopologyContext workerTopologyContext, String componentId, Map<String, Object> topoConf) { @@ -438,7 +434,8 @@ public abstract class Executor implements Callable, JCQueue.Consumer { // ============================ getter methods ================================= // ============================================================================= - private Map<String, Object> normalizedComponentConf(Map<String, Object> topoConf, WorkerTopologyContext topologyContext, String componentId) { + private Map<String, Object> normalizedComponentConf( + Map<String, Object> topoConf, WorkerTopologyContext topologyContext, String componentId) { List<String> keysToRemove = retrieveAllConfigKeys(); keysToRemove.remove(Config.TOPOLOGY_DEBUG); keysToRemove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING); @@ -503,7 +500,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer { return stormId; } - abstract public CommonStats getStats(); + public abstract CommonStats getStats(); public String getType() { return type; http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java index facbb75..f522ec6 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java +++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java @@ -114,7 +114,8 @@ public class BoltOutputCollectorImpl implements IOutputCollector { } else { msgId = MessageId.makeUnanchored(); } - TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), taskId, streamId, msgId); + TupleImpl tupleExt = new TupleImpl( + executor.getWorkerTopologyContext(), values, executor.getComponentId(), taskId, streamId, msgId); xsfer.tryTransfer(new AddressedTuple(t, tupleExt), executor.getPendingEmits()); } if (isEventLoggers) { @@ -145,7 +146,7 @@ public class BoltOutputCollectorImpl implements IOutputCollector { boltAckInfo.applyOn(task.getUserContext()); } if (delta >= 0) { - executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta); + executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta, task.getTaskMetrics().getAcked(input.getSourceStreamId())); } } @@ -166,7 +167,7 @@ public class BoltOutputCollectorImpl implements IOutputCollector { BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta); boltFailInfo.applyOn(task.getUserContext()); if (delta >= 0) { - executor.getStats().boltFailedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta); + executor.getStats().boltFailedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta, task.getTaskMetrics().getFailed(input.getSourceStreamId())); } } http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java index e204150..5f16ccb 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java @@ -15,11 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.executor.spout; import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.storm.Config; import org.apache.storm.Constants; @@ -46,20 +51,15 @@ import org.apache.storm.tuple.AddressedTuple; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.JCQueue; -import org.apache.storm.utils.Utils; import org.apache.storm.utils.MutableLong; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.ReflectionUtils; import org.apache.storm.utils.RotatingMap; import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - public class SpoutExecutor extends Executor { private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); @@ -92,7 +92,8 @@ public class SpoutExecutor extends Executor { this.emittedCount = new MutableLong(0); this.emptyEmitStreak = new MutableLong(0); this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); - this.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(this.getTopoConf()),ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS))); + this.stats = new SpoutExecutorStats( + ConfigUtils.samplingRate(this.getTopoConf()),ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS))); this.builtInMetrics = new BuiltinSpoutMetrics(stats); } @@ -108,13 +109,14 @@ public class SpoutExecutor extends Executor { Utils.sleep(100); } - LOG.info("Opening spout {}:{}", componentId, taskIds ); + LOG.info("Opening spout {}:{}", componentId, taskIds); this.idToTask = idToTask; this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); this.spouts = new ArrayList<>(); for (Task task : idToTask) { - if(task!=null) + if (task != null) { this.spouts.add((ISpout) task.getTaskObject()); + } } this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() { @Override @@ -130,9 +132,9 @@ public class SpoutExecutor extends Executor { this.spoutThrottlingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext()); this.errorReportingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext()); this.outputCollectors = new ArrayList<>(); - for (int i=0; i<idToTask.size(); ++i) { + for (int i = 0; i < idToTask.size(); ++i) { Task taskData = idToTask.get(i); - if (taskData==null) { + if (taskData == null) { continue; } ISpout spoutObject = (ISpout) taskData.getTaskObject(); @@ -203,12 +205,14 @@ public class SpoutExecutor extends Executor { } } if (reachedMaxSpoutPending) { - if (rmspCount == 0) + if (rmspCount == 0) { LOG.debug("Reached max spout pending"); + } rmspCount++; } else { - if (rmspCount > 0) + if (rmspCount > 0) { LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount); + } rmspCount = 0; } @@ -216,7 +220,7 @@ public class SpoutExecutor extends Executor { // continue without idling return 0L; } - if ( !pendingEmits.isEmpty() ) { // then facing backpressure + if (!pendingEmits.isEmpty()) { // then facing backpressure backPressureWaitStrategy(); return 0L; } @@ -344,8 +348,9 @@ public class SpoutExecutor extends Executor { if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext()); } - if (hasAckers && timeDelta!=null) { - executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta); + if (hasAckers && timeDelta != null) { + executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta, + taskData.getTaskMetrics().getAcked(tupleInfo.getStream())); } } catch (Exception e) { throw Utils.wrapInRuntime(e); @@ -362,7 +367,8 @@ public class SpoutExecutor extends Executor { spout.fail(tupleInfo.getMessageId()); new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext()); if (timeDelta != null) { - executor.getStats().spoutFailedTuple(tupleInfo.getStream(), timeDelta); + executor.getStats().spoutFailedTuple(tupleInfo.getStream(), timeDelta, + taskData.getTaskMetrics().getFailed(tupleInfo.getStream())); } } catch (Exception e) { throw Utils.wrapInRuntime(e); @@ -371,8 +377,9 @@ public class SpoutExecutor extends Executor { public int getSpoutRecvqCheckSkipCount() { - if(ackingEnabled) + if (ackingEnabled) { return 0; // always check recQ if ACKing enabled + } return ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS), 0); } http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java b/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java new file mode 100644 index 0000000..ccd7b19 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2; + +import org.apache.storm.utils.JCQueue; + +public class JcMetrics { + private final SimpleGauge<Long> capacity; + private final SimpleGauge<Long> population; + + JcMetrics(SimpleGauge<Long> capacity, + SimpleGauge<Long> population) { + this.capacity = capacity; + this.population = population; + } + + public void setCapacity(Long capacity) { + this.capacity.set(capacity); + } + + public void setPopulation(Long population) { + this.population.set(population); + } + + public void set(JCQueue.QueueMetrics metrics) { + this.capacity.set(metrics.capacity()); + this.population.set(metrics.population()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/SimpleGauge.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/SimpleGauge.java b/storm-client/src/jvm/org/apache/storm/metrics2/SimpleGauge.java new file mode 100644 index 0000000..b91dea8 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/SimpleGauge.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2; + +import com.codahale.metrics.Gauge; + +public class SimpleGauge<T> implements Gauge<T> { + private T value; + + public SimpleGauge(T value) { + this.value = value; + } + + @Override + public T getValue() { + return this.value; + } + + public void set(T value) { + this.value = value; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java new file mode 100644 index 0000000..eba86ad --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.ReflectionUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StormMetricRegistry { + + private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + + private static final MetricRegistry REGISTRY = new MetricRegistry(); + + private static final List<StormReporter> REPORTERS = new ArrayList<>(); + + private static String hostName = null; + + public static <T> SimpleGauge<T> gauge( + T initialValue, String name, String topologyId, String componentId, Integer taskId, Integer port) { + String metricName = metricName(name, topologyId, componentId, taskId, port); + if (REGISTRY.getGauges().containsKey(metricName)) { + return (SimpleGauge)REGISTRY.getGauges().get(metricName); + } else { + return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); + } + } + + public static JcMetrics jcMetrics(String name, String topologyId, String componentId, Integer taskId, Integer port) { + return new JcMetrics( + StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, taskId, port), + StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, taskId, port) + ); + } + + public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId) { + String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); + return REGISTRY.meter(metricName); + } + + public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId) { + String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); + return REGISTRY.counter(metricName); + } + + public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId) { + String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); + return REGISTRY.counter(metricName); + } + + public static void start(Map<String, Object> stormConfig, DaemonType type) { + try { + hostName = dotToUnderScore(Utils.localHostname()); + } catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname will be reported" + + " as 'localhost'."); + } + + LOG.info("Starting metrics reporters..."); + List<Map<String, Object>> reporterList = (List<Map<String, Object>>)stormConfig.get(Config.STORM_METRICS_REPORTERS); + if (reporterList != null && reporterList.size() > 0) { + for (Map<String, Object> reporterConfig : reporterList) { + // only start those requested + List<String> daemons = (List<String>) reporterConfig.get("daemons"); + for (String daemon : daemons) { + if (DaemonType.valueOf(daemon.toUpperCase()) == type) { + startReporter(stormConfig, reporterConfig); + } + } + } + } + } + + public static MetricRegistry registry() { + return REGISTRY; + } + + private static void startReporter(Map<String, Object> stormConfig, Map<String, Object> reporterConfig) { + String clazz = (String)reporterConfig.get("class"); + LOG.info("Attempting to instantiate reporter class: {}", clazz); + StormReporter reporter = ReflectionUtils.newInstance(clazz); + if (reporter != null) { + reporter.prepare(REGISTRY, stormConfig, reporterConfig); + reporter.start(); + REPORTERS.add(reporter); + } + + } + + public static void stop() { + for (StormReporter sr : REPORTERS) { + sr.stop(); + } + } + + public static String metricName(String name, String stormId, String componentId, String streamId, Integer taskId, Integer workerPort) { + StringBuilder sb = new StringBuilder("storm.worker."); + sb.append(stormId); + sb.append("."); + sb.append(hostName); + sb.append("."); + sb.append(dotToUnderScore(componentId)); + sb.append("."); + sb.append(dotToUnderScore(streamId)); + sb.append("."); + sb.append(taskId); + sb.append("."); + sb.append(workerPort); + sb.append("-"); + sb.append(name); + return sb.toString(); + } + + public static String metricName(String name, String stormId, String componentId, Integer taskId, Integer workerPort) { + StringBuilder sb = new StringBuilder("storm.worker."); + sb.append(stormId); + sb.append("."); + sb.append(hostName); + sb.append("."); + sb.append(dotToUnderScore(componentId)); + sb.append("."); + sb.append(taskId); + sb.append("."); + sb.append(workerPort); + sb.append("-"); + sb.append(name); + return sb.toString(); + } + + public static String metricName(String name, TopologyContext context) { + + + StringBuilder sb = new StringBuilder("storm.topology."); + sb.append(context.getStormId()); + sb.append("."); + sb.append(hostName); + sb.append("."); + sb.append(dotToUnderScore(context.getThisComponentId())); + sb.append("."); + sb.append(context.getThisTaskId()); + sb.append("."); + sb.append(context.getThisWorkerPort()); + sb.append("-"); + sb.append(name); + return sb.toString(); + } + + private static String dotToUnderScore(String str) { + return str.replace('.', '_'); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java new file mode 100644 index 0000000..e796df5 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.storm.task.WorkerTopologyContext; + +public class TaskMetrics { + private static final String METRIC_NAME_ACKED = "acked"; + private static final String METRIC_NAME_FAILED = "failed"; + private static final String METRIC_NAME_EMITTED = "emitted"; + private static final String METRIC_NAME_TRANSFERRED = "transferred"; + + private ConcurrentMap<String, Counter> ackedByStream = new ConcurrentHashMap<>(); + private ConcurrentMap<String, Counter> failedByStream = new ConcurrentHashMap<>(); + private ConcurrentMap<String, Counter> emittedByStream = new ConcurrentHashMap<>(); + private ConcurrentMap<String, Counter> transferredByStream = new ConcurrentHashMap<>(); + + private String topologyId; + private String componentId; + private Integer taskId; + private Integer workerPort; + + public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid) { + this.topologyId = context.getStormId(); + this.componentId = componentId; + this.taskId = taskid; + this.workerPort = context.getThisWorkerPort(); + } + + public Counter getAcked(String streamId) { + Counter c = this.ackedByStream.get(streamId); + if (c == null) { + c = StormMetricRegistry.counter(METRIC_NAME_ACKED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); + this.ackedByStream.put(streamId, c); + } + return c; + } + + public Counter getFailed(String streamId) { + Counter c = this.failedByStream.get(streamId); + if (c == null) { + c = StormMetricRegistry.counter(METRIC_NAME_FAILED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); + this.failedByStream.put(streamId, c); + } + return c; + } + + public Counter getEmitted(String streamId) { + Counter c = this.emittedByStream.get(streamId); + if (c == null) { + c = StormMetricRegistry.counter(METRIC_NAME_EMITTED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); + this.emittedByStream.put(streamId, c); + } + return c; + } + + public Counter getTransferred(String streamId) { + Counter c = this.transferredByStream.get(streamId); + if (c == null) { + c = StormMetricRegistry.counter( + METRIC_NAME_TRANSFERRED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); + this.transferredByStream.put(streamId, c); + } + return c; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java b/storm-client/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java new file mode 100644 index 0000000..a209664 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.filters; + +import com.codahale.metrics.Metric; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class RegexFilter implements StormMetricsFilter { + + private Pattern pattern; + + + @Override + public void prepare(Map<String, Object> config) { + String expression = (String) config.get("expression"); + if (expression != null) { + this.pattern = Pattern.compile(expression); + } else { + throw new IllegalStateException("RegexFilter requires an 'expression' parameter."); + } + } + + @Override + public boolean matches(String name, Metric metric) { + Matcher matcher = this.pattern.matcher(name); + return matcher.matches(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java b/storm-client/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java new file mode 100644 index 0000000..1f876aa --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.filters; + +import com.codahale.metrics.MetricFilter; + +import java.util.Map; + +public interface StormMetricsFilter extends MetricFilter { + + /** + * Called after the filter is instantiated. + * @param config A map of the properties from the 'filter' section of the reporter configuration. + */ + void prepare(Map<String, Object> config); + +}