[GitHub] storm issue #2913: STORM-3290: Split configuration for storm-kafka-client Tr...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2913 +1 ---
[GitHub] storm issue #2706: STORM-3097: deprecate storm-druid (1.x)
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2706 +1 ---
[GitHub] storm issue #2698: STORM-2882: shade storm-client dependencies
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2698 +1 @srdo You asked what the maven release commands were. They are: ``` mvn release:prepare -P dist mvn release:perform -P dist ``` Note that the process modifies the pom files, checks in changes, tags, and pushes to ASF git. So if you want to test the release process you will need to revert those changes. ---
[GitHub] storm issue #2639: STORM-3035: fix the issue in JmsSpout.ack when toCommit i...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2639 Still +1. Thanks @arunmahadevan ---
[GitHub] storm issue #2639: STORM-3035: fix the issue in JmsSpout.ack when toCommit i...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2639 Looks good to me. +1 I was wondering why you removed the `distributed` flag, and realized it was never properly implemented! For it to work, we would need the following method override: ```java @Override public Map<String, Object> getComponentConfiguration() { if(!_isDistributed) { Map<String, Object> ret = new HashMap<String, Object>(); ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1); return ret; } else { return null; } } ``` I can see cases where that flag would be useful, for example when connecting to a topic vs. a queue. We may want to leave the flag there and fix the override. ---
[GitHub] storm issue #2666: STORM-2988 Error on initialization of server mk-worker
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2666 +1 ---
[GitHub] storm issue #2665: STORM-2988 Error on initialization of server mk-worker
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2665 +1 ---
[GitHub] storm issue #2665: STORM-2988 Error on initialization of server mk-worker
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2665 @HeartSaVioR Agreed, but those issues can be addressed outside the scope of this bug (i.e. separate JIRA). For now we should focus on getting this fix in since it is currently holding up the release. ---
[GitHub] storm issue #2665: STORM-2988 Error on initialization of server mk-worker
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2665 > @ptgoetz , any reason to use a different constant while this config exists? And JmxStormReporter seem to use the constants from Config here and here ? @arunmahadevan, to keep them logically separated so either could be potentially deprecated down the road. ---
[GitHub] storm pull request #2665: STORM-2988 Error on initialization of server mk-wo...
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2665#discussion_r186580893 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java --- @@ -67,7 +68,7 @@ public void prepare(MetricRegistry metricsRegistry, Map<String, Object> stormCon } public static String getMetricsJMXDomain(Map reporterConf) { -return Utils.getString(reporterConf, JMX_DOMAIN); +return Utils.getString(reporterConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN), null); --- End diff -- âreturn Utils.getString(reporterConf, JMX_DOMAIN);â Should be something like âreturn Utils.getString(reporterConf.get(JMX_DOMAIN), âlocalhostâ);â (Apologies, not on a computer atm) ---
[GitHub] storm pull request #2665: STORM-2988 Error on initialization of server mk-wo...
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2665#discussion_r186578267 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java --- @@ -67,7 +68,7 @@ public void prepare(MetricRegistry metricsRegistry, Map<String, Object> stormCon } public static String getMetricsJMXDomain(Map reporterConf) { -return Utils.getString(reporterConf, JMX_DOMAIN); +return Utils.getString(reporterConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN), null); --- End diff -- @priyank5485 is correct. JMX_DOMAIN is the key for the lookup from the r Porter config. ---
[GitHub] storm issue #2518: STORM-2902: Some improvements for storm-rocketmq module
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2518 +1 ---
[GitHub] storm issue #2300: STORM-2691: Make storm-kafka-client implement the Trident...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2300 +1 ---
[GitHub] storm issue #2556: STORM-2946: Upgrade to HBase 2.0
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2556 @HeartSaVioR @arunmahadevan I tested this manually against HBase 2.0.0 beta-1 and HBase 1.1.2 using the examples as well as some custom code covering trident and core storm read/write. All tests passed. So it appears to backward compatible. ---
[GitHub] storm issue #2556: STORM-2946: Upgrade to HBase 2.0
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2556 @arunmahadevan Yes, I will share the results of my testing and will test for backward compatibility. I don't have high hopes for backward compatibility based on https://hbase.apache.org/book.html#hbase.versioning If it isn't compatible, we could either maintain separate storm-hbase modules, or start versioning/releasing storm-hbase independently of storm as we've discussed doing with storm-kafka-client. I'd lean toward the latter. ---
[GitHub] storm issue #2556: STORM-2946: Upgrade to HBase 2.0
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2556 > Did you test the patch manually with HBase 2.0 beta 1? Not extensively. The point on this JIRA/PR is to raise awareness that we should look at targeting new ecosystem versions. This holds true for HDFS, Hive, etc. > I guess there's little chance for HBase community to introduce API change after beta, but the possibility is still open. Moreover that's not ideal to depend on beta version. Do you intend to open this for the official release of HBase 2.0 and update the version/API change once it happens? Does HBase 2 client be compatible with HBase 1.x? At a glance of HBase 2 project doc I don't have much influence over the Apache HBase community, so this PR is dependent on what they do. It's an effort to align with where they are going. > Does HBase 2 client be compatible with HBase 1.x? At a glance of HBase 2 project doc (https://docs.google.com/document/d/1WCsVlnHjJeKUcl7wHwqb4z9iEu_ktczrlKHK8N4SZzs/edit#) they claim admin interface is incompatible and HBase 1 client cannot administrate HBase 2. I'm wondering about opposite case (HBase 2 client to administrate HBase 1 server), and same case for other things. In short, I'd like to determine needs to have separate hbase module for HBase 1.x and HBase 2.x. Not that I'm aware of. I don't think this change would be backward-compatible, hence my comment about versioning. ---
[GitHub] storm pull request #2556: STORM-2946: Upgrade to HBase 2.0
GitHub user ptgoetz opened a pull request: https://github.com/apache/storm/pull/2556 STORM-2946: Upgrade to HBase 2.0 https://issues.apache.org/jira/browse/STORM-2946 We may want to discuss changes like this in the email thread about independently versioned "external" components. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ptgoetz/storm STORM-2946 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2556.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2556 commit 278f01bf09ee6d0bbe71315a69118961405b19fe Author: P. Taylor Goetz <ptgoetz@...> Date: 2018-01-26T18:24:27Z Merge branch 'STORM-2912' of github.com:HeartSaVioR/storm commit f991dae41641eeb9101ec832dd2fd29b5ed3e059 Author: P. Taylor Goetz <ptgoetz@...> Date: 2018-02-07T19:02:54Z Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/storm commit 5c3a911c0aed922421f4fab387dfad550ec90b31 Author: P. Taylor Goetz <ptgoetz@...> Date: 2018-02-13T17:13:02Z Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/storm commit 06f7f66db3bee17257e7610ff67178fb4ddab49d Author: P. Taylor Goetz <ptgoetz@...> Date: 2018-02-13T21:56:12Z STORM-2946: Upgrade to HBase 2.0 commit 74439146a49aecf42fc129138e641c5d77ec0d76 Author: P. Taylor Goetz <ptgoetz@...> Date: 2018-02-13T23:52:54Z STORM-2946: checkstyle cleanup ---
[GitHub] storm-site issue #2: Update dependencies
Github user ptgoetz commented on the issue: https://github.com/apache/storm-site/pull/2 +1 ---
[GitHub] storm issue #2550: STORM-2937: Overwrite latest storm-kafka-client 1.x-branc...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2550 +1 successfully built for me and all tests passed. As far as squashing commits my opinion has always been if we do squash, it should be done when merging. There are a lot of cases where separate commits make review easier (e.g. separating pure whitespace changes). If commits are squashed during review, you have to re-read all the changes. ---
[GitHub] storm issue #2549: STORM-2936 Overwrite latest storm-kafka-client 1.x-branch...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2549 +1 ---
[GitHub] storm issue #2547: Storm 2913 2914 1.x
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2547 +1 ---
[GitHub] storm issue #2541: STORM-2918 Update Netty version
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2541 +1 ---
[GitHub] storm issue #2532: STORM-2912 Revert optimization of sharing tick tuple (1.x...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2532 +1 Nice catch. Agree on performance -- tick tuples are sent infrequently enough that the optimization isn't necessary. ---
[GitHub] storm issue #2526: STORM-2904: Document Metrics V2
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2526 @HeartSaVioR Fixed. ---
[GitHub] storm pull request #2526: STORM-2904: Document Metrics V2
GitHub user ptgoetz opened a pull request: https://github.com/apache/storm/pull/2526 STORM-2904: Document Metrics V2 Documentation only. No code changes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ptgoetz/storm metrics_v2_docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2526.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2526 commit 7239bbda655df48d263ccbfa298de871e1c07358 Author: P. Taylor Goetz <ptgoetz@...> Date: 2018-01-22T21:04:19Z STORM-2904: Document Metrics V2 ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @HeartSaVioR Absolutlely! I just want to make sure we are all on board with the changes. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 It seems like there is growing consensus that performance is good. Are there any objections to merging this? ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @revans2 Are there any additional changes you'd like to see for this? I'd like to move forward with a 1.2 release as well as start porting this to the master branch. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161356664 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ +String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); +return REGISTRY.counter(metricName); +} + +public static void start(Map<String, Object> stormConfig, DaemonType type){ +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161324136 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ +String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); +return REGISTRY.counter(metricName); +} + +public static void start(Map<String, Object> stormConfig, DaemonType type){ +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161301526 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ +String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); +return REGISTRY.counter(metricName); +} + +public static void start(Map<String, Object> stormConfig, DaemonType type){ +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161292783 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ +String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); +return REGISTRY.counter(metricName); +} + +public static void start(Map<String, Object> stormConfig, DaemonType type){ +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161291952 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ +String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); +return REGISTRY.counter(metricName); +} + +public static void start(Map<String, Object> stormConfig, DaemonType type){ +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161139613 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java --- @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import org.apache.storm.task.WorkerTopologyContext; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class TaskMetrics { +ConcurrentMap<String, Counter> ackedByStream = new ConcurrentHashMap<>(); +ConcurrentMap<String, Counter> failedByStream = new ConcurrentHashMap<>(); +ConcurrentMap<String, Counter> emittedByStream = new ConcurrentHashMap<>(); +ConcurrentMap<String, Counter> transferredByStream = new ConcurrentHashMap<>(); + +private String topologyId; +private String componentId; +private Integer taskId; +private Integer workerPort; + +public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid){ +this.topologyId = context.getStormId(); +this.componentId = componentId; +this.taskId = taskid; +this.workerPort = context.getThisWorkerPort(); +} + +public Counter getAcked(String streamId) { +Counter c = this.ackedByStream.get(streamId); +if (c == null) { +c = StormMetricRegistry.counter("acked", this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); +this.ackedByStream.put(streamId, c); +} +return c; +} + +public Counter getFailed(String streamId) { +Counter c = this.ackedByStream.get(streamId); --- End diff -- Good catch. Thanks for the fix. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 When reporting benchmark results, we should include OS patch level. The recent wave of patches will likely mess with benchmarks. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161087710 --- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj --- @@ -265,6 +265,7 @@ :stats (mk-executor-stats <> (sampling-rate storm-conf)) :interval->task->metric-registry (HashMap.) :task->component (:task->component worker) + :task-metrics (TaskMetrics/taskMetricsMap (first task-ids) (last task-ids) worker-context component-id) --- End diff -- @HeartSaVioR Nice, thanks! ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @revans2 Could you take another look when you have a chance? ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160805811 --- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj --- @@ -446,7 +451,7 @@ (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta + (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" (:worker-context executor-data) (:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream tuple-info)) (:stream tuple-info) time-delta --- End diff -- That makes a lot more sense now. Thanks for clarifying. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160777488 --- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj --- @@ -446,7 +451,7 @@ (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta + (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" (:worker-context executor-data) (:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream tuple-info)) (:stream tuple-info) time-delta --- End diff -- @revans2 regarding map lookup, wouldn't we have to concatenate in order to create the lookup key? ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @revans2 @HeartSaVioR Moved to `StringBuilder` and replaced executorId with taskId. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @HeartSaVioR @revans2 One obvious optimization worth trying is replacing `String.format()` with a `StringBuilder`. `String.format()` is cleaner visually, but much slower. I'll make that change and see where it gets us. I'm open to any additional ideas as well. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160513274 --- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj --- @@ -446,7 +451,7 @@ (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta + (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" (:worker-context executor-data) (:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream tuple-info)) (:stream tuple-info) time-delta --- End diff -- It's not ideal. Here's some background from earlier review: @HeartSaVioR: >> Looks like we compose metric name and lookup from REGISTRY every time, even without executor ID and stream ID. I can see more calculation should be done after addressing, but not sure how much it affects performance. If we could also memorize metric name per combination of parameters it might help, but I'm also not sure how much it will help. @ptgoetz: >Prior to adding stream ID, we could store the metric as a variable and reuse it without having to do a lookup on each use. Adding stream ID required (unless I'm missing something) doing the lookup on every use. There might be additional optimizations, but because metrics names are composed of several fields, some level of string concatenation is unavoidable. For example, we could try to optimize lookups by caching metrics with metric name as the key, but we would still have to do concatenation to create the lookup key. >I suppose we could create a MetricName class to serve as the cache key, but we'd be trading string concatenation for object creation (unless we name MetricName mutable with setX methods). ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160508705 --- Diff: storm-core/src/jvm/org/apache/storm/nimbus/DefaultTopologyValidator.java --- @@ -17,15 +17,49 @@ */ package org.apache.storm.nimbus; +import org.apache.storm.generated.Bolt; import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.SpoutSpec; import org.apache.storm.generated.StormTopology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Map; public class DefaultTopologyValidator implements ITopologyValidator { +private static final Logger LOG = LoggerFactory.getLogger(DefaultTopologyValidator.class); @Override public void prepare(Map StormConf){ } @Override -public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { +public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { +if(topologyName.contains(".")){ +LOG.warn("Metrics for topology name '{}' will be reported as '{}'.", topologyName, topologyName.replace('.', '_') ); +} +Map<String, SpoutSpec> spouts = topology.get_spouts(); +for(String spoutName : spouts.keySet()){ +if(spoutName.contains(".")){ +LOG.warn("Metrics for spout name '{}' will be reported as '{}'.", spoutName, spoutName.replace('.', '_') ); +} +SpoutSpec spoutSpec = spouts.get(spoutName); +for(String streamName : spoutSpec.get_common().get_streams().keySet()){ +if(streamName.contains(".")){ +LOG.warn("Metrics for stream name '{}' will be reported as '{}'.", streamName, streamName.replace('.', '_') ); +} +} +} + +Map<String, Bolt> bolts = topology.get_bolts(); +for(String boltName : bolts.keySet()){ +if(boltName.contains(".")){ +LOG.warn("Metrics for bolt name '{}' will be reported as '{}'.", boltName, boltName.replace('.', '_') ); +} +Bolt bolt = bolts.get(boltName); +for(String streamName : bolt.get_common().get_streams().keySet()){ +if(streamName.contains(".")){ +LOG.warn("Metrics for stream name '{}' will be reported as '{}'.", streamName, streamName.replace('.', '_') ); +} +} +} --- End diff -- True. The user will have to hunt, but I suppose it's better than silence. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160506051 --- Diff: storm-core/src/jvm/org/apache/storm/task/TopologyContext.java --- @@ -386,4 +388,28 @@ public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucke public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) { return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs); } + +public Timer registerTimer(String name){ +return StormMetricRegistry.registry().timer(metricName(name)); +} + +public Histogram registerHistogram(String name){ +return StormMetricRegistry.registry().histogram(metricName(name)); +} + +public Meter registerMeter(String name){ +return StormMetricRegistry.registry().meter(metricName(name)); +} + +public Counter registerCounter(String name){ +return StormMetricRegistry.registry().counter(metricName(name)); +} + +public Gauge registerGauge(String name, Gauge gauge){ +return StormMetricRegistry.registry().register(metricName(name), gauge); +} + +private String metricName(String name){ +return String.format("storm.topology.%s.%s.%s-%s", getStormId(), getThisComponentId(), getThisWorkerPort(), name); --- End diff -- Yes. Good catch. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160505473 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static void start(Map<String, Object> stormConfig, DaemonType type){ +String localHost = "localhost"; +try { +hostName = dotToUnderScore(Utils.localHostname()); +} catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname will be reported" + +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160499395 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId){ +String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static void start(Map<String, Object> stormConfig, DaemonType type){ +String localHost = (String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME); +if(localHost != null){ +hostName = localHost; +} else { +try { +hostName = InetAddress.getLocalHost().getCanonicalHostName(); +} catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" + + " as 'localhost'."); +} +} + +LOG.info("Starting metrics reporters..."); +List<Map<String, Object>> reporterList = (List<Map<String, Object>>)stormConfig.get(Config.STORM_METRICS_REPORTERS); +if(repo
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r159519245 --- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java --- @@ -65,7 +66,7 @@ private static final Object INTERRUPT = new Object(); private static final String PREFIX = "disruptor-"; private static final FlusherPool FLUSHER = new FlusherPool(); -private static final Timer METRICS_TIMER = new Timer("disruptor-metrics-timer", true); +private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1); --- End diff -- You were right. Fixed. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r159515622 --- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java --- @@ -65,7 +66,7 @@ private static final Object INTERRUPT = new Object(); private static final String PREFIX = "disruptor-"; private static final FlusherPool FLUSHER = new FlusherPool(); -private static final Timer METRICS_TIMER = new Timer("disruptor-metrics-timer", true); +private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1); --- End diff -- Good catch. I'll check and update as necessary. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @srdo Thanks for the review. I think I've addressed your comments in the latest commit. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r158549262 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +protected long reportingPeriod; +protected TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { +LOG.debug("Starting..."); +reporter.start(reportingPeriod, reportingPeriodUnit); +} else { +throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); +} +} + +@Override +public void stop() { +if (reporter != null) { +LOG.debug("Stopping..."); +reporter.stop(); +} else { +throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); +} +} + + +public static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) { +TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); +return unit == null ? TimeUnit.SECONDS : unit; +} + +private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) { +String rateUnitString = Utils.getString(reporterConf.get(configName), null); +if (rateUnitString != null) { +return TimeUnit.valueOf(rateUnitString); +} +return null; +} + +public static long getReportPeriod(Map reporterConf) { +return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue(); +} + +public static StormMetricsFilter getMetricsFilter(Map reporterConf){ +StormMetricsFilter filter = null; +Map<String, Object> filterConf = (Map)reporterConf.get("filter"); +if(filterConf != null) { +String clazz = (String) filterConf.get("class"); +if (clazz != null) { +try { +filter = (StormMetricsFilter) Metrics2Utils.instantiate(clazz); +filter.prepare(filterConf); +} catch (Exception e) { +LOG.warn("Unable to instantiate StormMetricsFilter class: {}", clazz); --- End diff -- Okay. I changed it to crash instead. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r158549210 --- Diff: storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java --- @@ -493,6 +493,46 @@ public void validateField(String name, Object o) { } } +public static class MetricReportersValidator extends Validator { + +@Override +public void validateField(String name, Object o) { +if(o == null) { +return; +} +SimpleTypeValidator.validateField(name, Map.class, o); +if(!((Map) o).containsKey("class") ) { --- End diff -- Done. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r158549192 --- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java --- @@ -418,12 +430,19 @@ public DisruptorQueue(String queueName, ProducerType type, int size, long readTi _barrier = _buffer.newBarrier(); _buffer.addGatingSequences(_consumer); _metrics = new QueueMetrics(); +_disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, port); //The batch size can be no larger than half the full queue size. //This is mostly to avoid contention issues. _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2)); _flusher = new Flusher(Math.max(flushInterval, 1), _queueName); _flusher.start(); +METRICS_TIMER.schedule(new TimerTask(){ --- End diff -- Addressed in latest commit. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r158545079 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java --- @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Reporter; + +import java.util.Map; + +public interface StormReporter extends Reporter { --- End diff -- If I'm interpreting your question correctly, that's what the `stop()` method is for. ---
[GitHub] storm issue #2462: STORM-2858: Fix worker-launcher build by erroring out if ...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2462 +1 ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @arunmahadevan Rebased. ---
[GitHub] storm issue #2469: STORM-2861: Explicit reference kafka-schema-registry-clie...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2469 @vesense I tried to reproduce this by deleting Kafka-avro-serializer from my local maven repository, but the build succeeded. Can you elaborate a bit? ---
[GitHub] storm issue #2470: [STORM-2690] resurrect invocation of ISupervisor.assigned...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2470 +1 ---
[GitHub] storm issue #2472: [STORM-2690] resurrect invocation of ISupervisor.assigned...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2472 +1 ---
[GitHub] storm issue #2471: [STORM-2690] resurrect invocation of ISupervisor.assigned...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2471 +1 ---
[GitHub] storm issue #2468: [STORM-2690] resurrect invocation of ISupervisor.assigned...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2468 @erikdw Excellent! Thanks bringing this up and helping solve the issue. ---
[GitHub] storm issue #2468: [STORM-2690] resurrect invocation of ISupervisor.assigned...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2468 +1 @erikdw Are you planning to port to earlier branches? ---
[GitHub] storm pull request #2458: (1.x) STORM-2854 Expose IEventLogger to make event...
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2458#discussion_r157091241 --- Diff: storm-core/src/jvm/org/apache/storm/metric/IEventLogger.java --- @@ -31,32 +32,54 @@ /** * A wrapper for the fields that we would log. */ -public static class EventInfo { -String ts; -String component; -String task; -String messageId; -String values; -EventInfo(String ts, String component, String task, String messageId, String values) { +class EventInfo { +private long ts; +private String component; +private int task; +private Object messageId; +private List values; + +public EventInfo(long ts, String component, int task, Object messageId, List values) { this.ts = ts; this.component = component; this.task = task; this.messageId = messageId; this.values = values; } +public long getTs() { +return ts; +} + +public String getComponent() { +return component; +} + +public int getTask() { +return task; +} + +public Object getMessageId() { +return messageId; +} + +public List getValues() { +return values; +} + /** * Returns a default formatted string with fields separated by "," * * @return a default formatted string with fields separated by "," */ @Override public String toString() { -return new Date(Long.parseLong(ts)).toString() + "," + component + "," + task + "," + messageId + "," + values; +return new Date(ts).toString() + "," + component + "," + String.valueOf(task) + "," --- End diff -- Fair enough. I think it would be helpful to document that. ---
[GitHub] storm issue #2459: STORM-2855: Revert to 2017Q4 Ubuntu image in Travis to fi...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2459 +1 ---
[GitHub] storm issue #2447: STORM-2845 Drop standalone mode of Storm SQL
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2447 +1 ---
[GitHub] storm pull request #2458: (1.x) STORM-2854 Expose IEventLogger to make event...
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2458#discussion_r157056998 --- Diff: storm-core/src/jvm/org/apache/storm/metric/IEventLogger.java --- @@ -31,32 +32,54 @@ /** * A wrapper for the fields that we would log. */ -public static class EventInfo { -String ts; -String component; -String task; -String messageId; -String values; -EventInfo(String ts, String component, String task, String messageId, String values) { +class EventInfo { +private long ts; +private String component; +private int task; +private Object messageId; +private List values; + +public EventInfo(long ts, String component, int task, Object messageId, List values) { this.ts = ts; this.component = component; this.task = task; this.messageId = messageId; this.values = values; } +public long getTs() { +return ts; +} + +public String getComponent() { +return component; +} + +public int getTask() { +return task; +} + +public Object getMessageId() { +return messageId; +} + +public List getValues() { +return values; +} + /** * Returns a default formatted string with fields separated by "," * * @return a default formatted string with fields separated by "," */ @Override public String toString() { -return new Date(Long.parseLong(ts)).toString() + "," + component + "," + task + "," + messageId + "," + values; +return new Date(ts).toString() + "," + component + "," + String.valueOf(task) + "," --- End diff -- Do we want to support configurable date formats or default to `Date.toString()`? ---
[GitHub] storm pull request #2458: (1.x) STORM-2854 Expose IEventLogger to make event...
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2458#discussion_r157056272 --- Diff: conf/storm.yaml.example --- @@ -72,4 +72,11 @@ # argument: # - endpoint: "metrics-collector.mycompany.org" # -# storm.cluster.metrics.consumer.publish.interval.secs: 60 \ No newline at end of file +# storm.cluster.metrics.consumer.publish.interval.secs: 60 + +# Event Logger +# topology.event.logger.register: +# - class: "org.apache.storm.metric.FileBasedEventLogger" +# - class: "org.mycompany.MyEventLogger" +# argument: --- End diff -- Nit: May want to make this "arguments" if it is a list. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 > Looks like we compose metric name and lookup from REGISTRY every time, even without executor ID and stream ID. I can see more calculation should be done after addressing, but not sure how much it affects performance. If we could also memorize metric name per combination of parameters it might help, but I'm also not sure how much it will help. Prior to adding stream ID, we could store the metric as a variable and reuse it without having to do a lookup on each use. Adding stream ID required (unless I'm missing something) doing the lookup on every use. There might be additional optimizations, but because metrics names are composed of several fields, some level of string concatenation is unavoidable. For example, we could try to optimize lookups by caching metrics with metric name as the key, but we would still have to do concatenation to create the lookup key. I suppose we could create a `MetricName` class to serve as the cache key, but we'd be trading string concatenation for object creation (unless we name `MetricName` mutable with `setX` methods). > Regarding issuing warning on name, +1 on your approach. Looks nice! Thanks for the feedback. Implemented in latest commit. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @revans2 @HeartSaVioR I added stream id and executor id to the metrics names, and implemented replacing "." with "_". One consequence of adding stream id was having to get-or-create metrics on the fly, as opposed to creating them up-front. That means there will be a lookup, string concat, and string replacement on each metrics update, which could affect performance. (Unless I'm missing something obvious, we have to do it that way because we don't know the stream IDs ahead of time and have to instantiate metrics as we see them). As far as issuing warnings when names contain ".", one option would be to handle the warnings in an `ITopologyValidator` instance. We could have `DefaultTopologyValidator` log warnings when certain names contain a ".". We could also provide something along the lines of a `StrictTopologyValidator` that throws a `InvalidTopologyException` as opposed to just warning. That might be an easy way to transition from warning to error that also gives users to turn strict checking on or off. What do you think about that approach? ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @revans2 Thanks for clarifying. I have a better understanding of what you're saying now. How do you feel about just replacing "." with "_" on all metrics path components (host name, componentId, etc.)? That would ensure a path could be reliably split on the "." character. I'll add stream id and executor id to the path in places where they are available. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155864521 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +protected long reportingPeriod; +protected TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { +LOG.debug("Starting..."); +reporter.start(reportingPeriod, reportingPeriodUnit); +} else { +throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); +} +} + +@Override +public void stop() { +if (reporter != null) { +LOG.debug("Stopping..."); +reporter.stop(); +} else { +throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); +} +} + + +public static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) { +TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); +return unit == null ? TimeUnit.SECONDS : unit; +} + +private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) { +String rateUnitString = Utils.getString(reporterConf.get(configName), null); +if (rateUnitString != null) { +return TimeUnit.valueOf(rateUnitString); +} +return null; +} + +public static long getReportPeriod(Map reporterConf) { +return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue(); +} + +public static StormMetricsFilter getMetricsFilter(Map reporterConf){ +StormMetricsFilter filter = null; +Map<String, Object> filterConf = (Map)reporterConf.get("filter"); +if(filterConf != null) { +String clazz = (String) filterConf.get("class"); +if (clazz != null) { +try { +filter = (StormMetricsFilter) Metrics2Utils.instantiate(clazz); +filter.prepare(filterConf); +} catch (Exception e) { +LOG.warn("Unable to instantiate StormMetricsFilter class: {}", clazz); --- End diff -- I could go either way... But it didn't seem right to crash just because of a misconfigured filter. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155863202 --- Diff: storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java --- @@ -493,6 +493,46 @@ public void validateField(String name, Object o) { } } +public static class MetricReportersValidator extends Validator { + +@Override +public void validateField(String name, Object o) { +if(o == null) { +return; +} +SimpleTypeValidator.validateField(name, Map.class, o); +if(!((Map) o).containsKey("class") ) { +throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); +} +if(!((Map) o).containsKey("daemons") ) { +throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons"); +} else { +// daemons can only be 'nimbus', 'supervisor', or 'worker' +Object list = ((Map)o).get("daemons"); +if(list == null || !(list instanceof List)){ +throw new IllegalArgumentException("Field 'daemons' must be a non-null list."); +} +List daemonList = (List)list; +for(Object string : daemonList){ +if (string instanceof String && +(((String) string).equals("nimbus") || +((String) string).equals("supervisor") || +((String) string).equals("worker"))) { +return; +} +throw new IllegalArgumentException("Field daemons must contain at least one of \"nimbus\", \"supervisor\", or \"worker\""); +} + +} +if(((Map)o).containsKey("filter")){ +Map filterMap = (Map)((Map)o).get("filter"); +SimpleTypeValidator.validateField("filter", String.class, filterMap.get("class")); --- End diff -- Yes. Good catch. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155863192 --- Diff: storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java --- @@ -493,6 +493,46 @@ public void validateField(String name, Object o) { } } +public static class MetricReportersValidator extends Validator { + +@Override +public void validateField(String name, Object o) { +if(o == null) { +return; +} +SimpleTypeValidator.validateField(name, Map.class, o); +if(!((Map) o).containsKey("class") ) { +throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); +} +if(!((Map) o).containsKey("daemons") ) { +throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons"); +} else { +// daemons can only be 'nimbus', 'supervisor', or 'worker' +Object list = ((Map)o).get("daemons"); +if(list == null || !(list instanceof List)){ +throw new IllegalArgumentException("Field 'daemons' must be a non-null list."); +} +List daemonList = (List)list; +for(Object string : daemonList){ +if (string instanceof String && +(((String) string).equals("nimbus") || +((String) string).equals("supervisor") || +((String) string).equals("worker"))) { +return; +} +throw new IllegalArgumentException("Field daemons must contain at least one of \"nimbus\", \"supervisor\", or \"worker\""); +} + +} +if(((Map)o).containsKey("filter")){ +Map filterMap = (Map)((Map)o).get("filter"); +SimpleTypeValidator.validateField("filter", String.class, filterMap.get("class")); +} +SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class")); --- End diff -- Not here. The `name` variable should contain "class" here. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155856433 --- Diff: storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java --- @@ -493,6 +493,46 @@ public void validateField(String name, Object o) { } } +public static class MetricReportersValidator extends Validator { + +@Override +public void validateField(String name, Object o) { +if(o == null) { +return; +} +SimpleTypeValidator.validateField(name, Map.class, o); +if(!((Map) o).containsKey("class") ) { +throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); +} +if(!((Map) o).containsKey("daemons") ) { +throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons"); +} else { +// daemons can only be 'nimbus', 'supervisor', or 'worker' +Object list = ((Map)o).get("daemons"); +if(list == null || !(list instanceof List)){ +throw new IllegalArgumentException("Field 'daemons' must be a non-null list."); +} +List daemonList = (List)list; +for(Object string : daemonList){ +if (string instanceof String && +(((String) string).equals("nimbus") || +((String) string).equals("supervisor") || +((String) string).equals("worker"))) { +return; --- End diff -- Good catch. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155855673 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +protected long reportingPeriod; +protected TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { +LOG.debug("Starting..."); +reporter.start(reportingPeriod, reportingPeriodUnit); +} else { +throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); +} +} + +@Override +public void stop() { +if (reporter != null) { +LOG.debug("Stopping..."); +reporter.stop(); +} else { +throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); +} +} + + +public static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) { +TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); +return unit == null ? TimeUnit.SECONDS : unit; --- End diff -- We can handle this in documentation. Documentation is pending the finalization of the approach to metrics naming, paths, metadata, etc. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155855300 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +protected long reportingPeriod; +protected TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { --- End diff -- Yes, but it gives additional information about what went wrong as opposed to a "naked" NPE. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155854567 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java --- @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ganglia.GangliaReporter; +import com.codahale.metrics.MetricRegistry; +import info.ganglia.gmetric4j.gmetric.GMetric; +import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class GangliaStormReporter extends ScheduledStormReporter { +private final static Logger LOG = LoggerFactory.getLogger(GangliaStormReporter.class); + +public static final String GANGLIA_HOST = "ganglia.host"; +public static final String GANGLIA_PORT = "ganglia.port"; +public static final String GANGLIA_PREFIXED_WITH = "ganglia.prefixed.with"; +public static final String GANGLIA_DMAX = "ganglia.dmax"; +public static final String GANGLIA_TMAX = "ganglia.tmax"; +public static final String GANGLIA_UDP_ADDRESSING_MODE = "ganglia.udp.addressing.mode"; +public static final String GANGLIA_RATE_UNIT = "ganglia.rate.unit"; +public static final String GANGLIA_DURATION_UNIT = "ganglia.duration.unit"; +public static final String GANGLIA_TTL = "ganglia.ttl"; +public static final String GANGLIA_UDP_GROUP = "ganglia.udp.group"; + +@Override +public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { +LOG.debug("Preparing..."); +GangliaReporter.Builder builder = GangliaReporter.forRegistry(metricsRegistry); + +TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf); +if (durationUnit != null) { +builder.convertDurationsTo(durationUnit); +} + +TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf); +if (rateUnit != null) { +builder.convertRatesTo(rateUnit); +} + +StormMetricsFilter filter = getMetricsFilter(reporterConf); +if(filter != null){ +builder.filter(filter); +} +String prefix = getMetricsPrefixedWith(reporterConf); +if (prefix != null) { +builder.prefixedWith(prefix); +} + +Integer dmax = getGangliaDMax(reporterConf); +if (prefix != null) { +builder.withDMax(dmax); +} + +Integer tmax = getGangliaTMax(reporterConf); +if (prefix != null) { +builder.withTMax(tmax); +} + +//defaults to 10 +reportingPeriod = getReportPeriod(reporterConf); + +//defaults to seconds +reportingPeriodUnit = getReportPeriodUnit(reporterConf); + +// Not exposed: --- End diff -- It means the `GangliaReporter.Builder.withClock` method is not exposed/setable by the storm configuration. I'll remove it. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @revans2 (cc @HeartSaVioR ) Regarding metadata, parseable metrics names/paths, etc. what do you think of the following approach? In a nutshell, make everything configurable (with sane defaults), something along the lines: METRICS_V2_PREFIX (String prepended to all metrics paths, replacing hard-coded "storm.worker", etc. in current code) METRICS_V2_PATH_DELIMITER (String/character used to separate metrics path, replaces hard-coded "." in current code. METRICS_V2_INVALID_NAME_REGEX (Regex that checks user-supplied metrics names for disallowed characters. Would be used to prevent users from inadvertently breaking up a path, for example by putting a "." in a metric name when that's used as the delimiter. Does that seem reasonable? ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @HeartSaVioR Agreed/understood. We can squash on merge and cleanup commit messages. This is a feature branch. You can commit directly if you want. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @HeartSaVioR Thanks for the update. I pulled your changes into the metrics_v2 branch. @revans2 I'll start working on naming conventions and disallowing certain delimiter characters. If you have any sample paths/names that illustrate your thinking that would be helpful. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 Crude test, but illustrates the cost of meters: (code marks a meter | increments a counter from 0 to `Integer.MAX`) ``` *** METER *** Time: 126.39 ops/sec: 16,990,930 *** COUNTER *** Time: 18.221 ops/sec: 117,857,617 ``` The obvious path would be to switch critical path metrics to use counters. But I'd ultimately err on the side of user choice (e.g. let users decide which to use). That could be made configurable. I can imagine use cases where users would be willing to take a minor performance hit for more performance metrics (e.g. "sleepy" topologies). The performance hit could be tolerable in certain situations. For the time being, I'll switch some of the metrics to counters, and re-run. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @revans2 Thanks for the update. My first instinct is that itâs usage of Meter on the critical path. Adding anything that âdoes somethingâ there is going to add some level of overhead. Iâll do some (micro)benchmarking and experiments to find out. I think there will be a number of mitigation options. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @HeartSaVioR I know. ;) I'm thinking more in terms of hardware profile and what other processes are running. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 We really need hardware and environment information. I'd also argue that tests should be run headless. I've seen some benchmarks vary greatly on a MBP depending on what you're doing at the time. Something as mundane as checking email, etc. can affect benchmark results. sigar might be an easy target (since it's included) to get some of that info. ---
[GitHub] storm issue #2409: STORM-2796: Flux: Provide means for invoking static facto...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2409 @roshannaik See latest commit. I merged/fixed your patch. Most of the issues were typos (e.g. "refList" != "reflist"). The other issue was that properties in flux do not (currently) support ref lists. However, you can work around it by invoking the setter as a config method: ```yaml configMethods: - name: "setTimeLenArr" args: - reflist: ["time1", "time2"] ``` We can probably handle property ref list support as a separate issue since there is a fairly easy workaround. ---
[GitHub] storm issue #2409: STORM-2796: Flux: Provide means for invoking static facto...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2409 @HeartSaVioR Yes, I will add assertions, although the tests would still fail without them (the constructor/method won't be found and a IllegalArgumentException will be thrown). @roshannaik Thanks for the patch. I'll take a look. ---
[GitHub] storm issue #2409: STORM-2796: Flux: Provide means for invoking static facto...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2409 @HeartSaVioR Thanks for the review. Addressed your comment and fixed another issue discovered in testing. ---
[GitHub] storm issue #2409: STORM-2796: Flux: Provide means for invoking static facto...
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2409 I can squash the commits later, but for now keeping the check style commit separate will make review a lot easier. ---
[GitHub] storm pull request #2409: STORM-2796: Flux: Provide means for invoking stati...
GitHub user ptgoetz opened a pull request: https://github.com/apache/storm/pull/2409 STORM-2796: Flux: Provide means for invoking static factory methods Note: The guts of this change are in the first commit. The last commit simply addresses check style violations. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ptgoetz/storm flux_factory_methods Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2409.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2409 commit f0abe4757076f78d987cd266e6ce4d4bd847cfb2 Author: P. Taylor Goetz <ptgo...@gmail.com> Date: 2017-11-04T03:17:46Z STORM-2796: Implement support for static factory methods commit fd0d10feff3ff3beae29276c7a942a3617154658 Author: P. Taylor Goetz <ptgo...@gmail.com> Date: 2017-11-10T18:10:03Z add test for factory args commit 5a6be5fb4a1bce4b7a9ddb332533aa4167ec543b Author: P. Taylor Goetz <ptgo...@gmail.com> Date: 2017-11-10T20:51:24Z Address checkstyle errors. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @revans2 Added rudimentary sanity check validation for metrics reporters configs. Because reporter implementations may want to have their own custom config keys, we can't really cover everything. I think we'll have to settle on basic sanity checks + reporters documenting the custom configuration fields they expose. I'll think more about metrics naming and metadata. Like I said, feedback, opinions and pull requests against this branch are more than welcome. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @revans2 I agree with you regarding getting the metrics naming correct. In fact I think it's one of the most important aspects. I'd like to get as much input/collaboration from others as possible to make sure we get it right from the start. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @revans2, @HeartSaVioR, @arunmahadevan Addressed review comments. Also note that this is a feature branch, so it's open to pull requests from anyone. If there's anything you'd like to see that I haven't addressed (e.g. adding fields to metrics names) feel free to suggest. Re: Documentation and port to master: Once this is merged I plan to add documentation and port to master. I can file a JIRA for those if desired. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r141924200 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/Metrics2Utils.java --- @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +public class Metrics2Utils { --- End diff -- The `Utils.newInstance()` method throws `RuntimeException` if the class instantiation fails. I don't think we want that in this case. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r141922507 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId){ +String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static void start(Map<String, Object> stormConfig, DaemonType type){ +String localHost = (String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME); +if(localHost != null){ +hostName = localHost; +} else { +try { +hostName = InetAddress.getLocalHost().getCanonicalHostName(); +} catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" + + " as 'localhost'."); +} +} + +LOG.info("Starting metrics reporters..."); +List<Map<String, Object>> reporterList = (List<Map<String, Object>>)stormConfig.get(Config.STORM_METRICS_REPORTERS); +if(repo
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r141137280 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ --- End diff -- Yes it is thread-safe. Will double-check on the multiple registrations and update as appropriate. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @HeartSaVioR > We need to document how to use new metrics and its reporter, and also need to have patch for master branch (maybe with removal of metrics V1 public API). As I mentioned elsewhere, I want to hold off on porting this to the master branch until all comments are addressed and this patch is in a mergeable state. That way we can avoid porting small changes between the two branches. As far as deprecating/removing the metrics v1 API, I'd hold off until there is a consensus that this approach is a suitable replacement for most use cases. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r141137596 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ --- End diff -- > Given that all the values are calculated from outside, isn't Gauge right on all fields? Yes. I removed the comment and left it as a Guage. Not sure why that comment is still showing up in the GitHub diff. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @arunmahadevan > Currently this patch addresses registering custom metrics and sending the metrics out via reporters and does not send any metrics to nimbus right? Why is disruptor metrics handled as a part of this? No, this does not send any metrics to nimbus. Disruptor metrics are included because many users would like to track those metrics (and they aren't available via Storm UI/REST). > There will be one thread per reporter per worker which sends out the metrics to the destination? Just thinking if it would overwhelm the destination and would need some local aggregation and have the supervisor report the metrics. The idea here was to purposely avoid attempting aggregation, since in some cases it's impossible (e.g. there's no way to aggregate histograms). Instead, any aggregation is assumed to take place in the destination system (Grafana, etc.). ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r141131796 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java --- @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + + +public class AnchoredWordCount { +public static class RandomSentenceSpout extends BaseRichSpout { +private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class); + +SpoutOutputCollector _collector; +Random _rand; --- End diff -- Absolutely (I'm personally not a big fan of the "_" convention). In general (outside the main branch) I try to follow the conventions that exist in the current source for consistency. ---
[GitHub] storm issue #2329: STORM-2722: close the JMSSpout in the tests when done
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2329 +1 ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r132748013 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java --- @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import org.apache.storm.utils.DisruptorQueue; + +public class DisruptorMetrics { +private SimpleGauge capacity; +private SimpleGauge population; +private SimpleGauge writePosition; +private SimpleGauge readPosition; +private SimpleGauge arrivalRate; // TODO: Change to meter --- End diff -- Yes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---