[GitHub] storm issue #2913: STORM-3290: Split configuration for storm-kafka-client Tr...

2018-11-29 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2913
  
+1


---


[GitHub] storm issue #2706: STORM-3097: deprecate storm-druid (1.x)

2018-06-07 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2706
  
+1


---


[GitHub] storm issue #2698: STORM-2882: shade storm-client dependencies

2018-06-06 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2698
  
+1

@srdo You asked what the maven release commands were. They are:

```
mvn release:prepare -P dist
mvn release:perform -P dist
```

Note that the process modifies the pom files, checks in changes, tags, and 
pushes to ASF git. So if you want to test the release process you will need to 
revert those changes.


---


[GitHub] storm issue #2639: STORM-3035: fix the issue in JmsSpout.ack when toCommit i...

2018-05-11 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2639
  
Still +1. Thanks @arunmahadevan 


---


[GitHub] storm issue #2639: STORM-3035: fix the issue in JmsSpout.ack when toCommit i...

2018-05-11 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2639
  
Looks good to me. +1

I was wondering why you removed the `distributed` flag, and realized it was 
never properly implemented! For it to work, we would need the following method 
override:

```java
@Override
public Map<String, Object> getComponentConfiguration() {
if(!_isDistributed) {
Map<String, Object> ret = new HashMap<String, Object>();
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
return ret;
} else {
return null;
}
}
```

I can see cases where that flag would be useful, for example when 
connecting to a topic vs. a queue. We may want to leave the flag there and fix 
the override.


---


[GitHub] storm issue #2666: STORM-2988 Error on initialization of server mk-worker

2018-05-09 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2666
  
+1


---


[GitHub] storm issue #2665: STORM-2988 Error on initialization of server mk-worker

2018-05-09 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2665
  
+1


---


[GitHub] storm issue #2665: STORM-2988 Error on initialization of server mk-worker

2018-05-08 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2665
  
@HeartSaVioR Agreed, but those issues can be addressed outside the scope of 
this bug (i.e. separate JIRA). For now we should focus on getting this fix in 
since it is currently holding up the release.


---


[GitHub] storm issue #2665: STORM-2988 Error on initialization of server mk-worker

2018-05-07 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2665
  
> @ptgoetz , any reason to use a different constant while this config 
exists? And JmxStormReporter seem to use the constants from Config here and 
here ?

@arunmahadevan, to keep them logically separated so either could be 
potentially deprecated down the road.


---


[GitHub] storm pull request #2665: STORM-2988 Error on initialization of server mk-wo...

2018-05-07 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2665#discussion_r186580893
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java ---
@@ -67,7 +68,7 @@ public void prepare(MetricRegistry metricsRegistry, 
Map<String, Object> stormCon
 }
 
 public static String getMetricsJMXDomain(Map reporterConf) {
-return Utils.getString(reporterConf, JMX_DOMAIN);
+return 
Utils.getString(reporterConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN),
 null);
--- End diff --

‘return Utils.getString(reporterConf, JMX_DOMAIN);’

Should be something like 

‘return Utils.getString(reporterConf.get(JMX_DOMAIN), “localhost”);’

(Apologies, not on a computer atm)


---


[GitHub] storm pull request #2665: STORM-2988 Error on initialization of server mk-wo...

2018-05-07 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2665#discussion_r186578267
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java ---
@@ -67,7 +68,7 @@ public void prepare(MetricRegistry metricsRegistry, 
Map<String, Object> stormCon
 }
 
 public static String getMetricsJMXDomain(Map reporterConf) {
-return Utils.getString(reporterConf, JMX_DOMAIN);
+return 
Utils.getString(reporterConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN),
 null);
--- End diff --

@priyank5485 is correct. JMX_DOMAIN is the key for the lookup from the r 
Porter config.


---


[GitHub] storm issue #2518: STORM-2902: Some improvements for storm-rocketmq module

2018-04-19 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2518
  
+1


---


[GitHub] storm issue #2300: STORM-2691: Make storm-kafka-client implement the Trident...

2018-02-27 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2300
  
+1


---


[GitHub] storm issue #2556: STORM-2946: Upgrade to HBase 2.0

2018-02-21 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2556
  
@HeartSaVioR @arunmahadevan 

I tested this manually against HBase 2.0.0 beta-1 and HBase 1.1.2 using the 
examples as well as some custom code covering trident and core storm 
read/write. All tests passed. So it appears to backward compatible.


---


[GitHub] storm issue #2556: STORM-2946: Upgrade to HBase 2.0

2018-02-16 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2556
  
@arunmahadevan Yes, I will share the results of my testing and will test 
for backward compatibility.

I don't have high hopes for backward compatibility based on 
https://hbase.apache.org/book.html#hbase.versioning

If it isn't compatible, we could either maintain separate storm-hbase 
modules, or start versioning/releasing storm-hbase independently of storm as 
we've discussed doing with storm-kafka-client. I'd lean toward the latter.


---


[GitHub] storm issue #2556: STORM-2946: Upgrade to HBase 2.0

2018-02-13 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2556
  
> Did you test the patch manually with HBase 2.0 beta 1?

Not extensively. The point on this JIRA/PR is to raise awareness that we 
should look at targeting new ecosystem versions. This holds true for HDFS, 
Hive, etc.

> I guess there's little chance for HBase community to introduce API change 
after beta, but the possibility is still open. Moreover that's not ideal to 
depend on beta version. Do you intend to open this for the official release of 
HBase 2.0 and update the version/API change once it happens?
Does HBase 2 client be compatible with HBase 1.x? At a glance of HBase 2 
project doc 

I don't have much influence over the Apache HBase community, so this PR is 
dependent on what they do. It's an effort to align with where they are going.

> Does HBase 2 client be compatible with HBase 1.x? At a glance of HBase 2 
project doc 
(https://docs.google.com/document/d/1WCsVlnHjJeKUcl7wHwqb4z9iEu_ktczrlKHK8N4SZzs/edit#)
 they claim admin interface is incompatible and HBase 1 client cannot 
administrate HBase 2. I'm wondering about opposite case (HBase 2 client to 
administrate HBase 1 server), and same case for other things. In short, I'd 
like to determine needs to have separate hbase module for HBase 1.x and HBase 
2.x.

Not that I'm aware of. I don't think this change would be 
backward-compatible, hence my comment about versioning.




---


[GitHub] storm pull request #2556: STORM-2946: Upgrade to HBase 2.0

2018-02-13 Thread ptgoetz
GitHub user ptgoetz opened a pull request:

https://github.com/apache/storm/pull/2556

STORM-2946: Upgrade to HBase 2.0

https://issues.apache.org/jira/browse/STORM-2946

We may want to discuss changes like this in the email thread about 
independently versioned "external" components.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ptgoetz/storm STORM-2946

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2556.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2556


commit 278f01bf09ee6d0bbe71315a69118961405b19fe
Author: P. Taylor Goetz <ptgoetz@...>
Date:   2018-01-26T18:24:27Z

Merge branch 'STORM-2912' of github.com:HeartSaVioR/storm

commit f991dae41641eeb9101ec832dd2fd29b5ed3e059
Author: P. Taylor Goetz <ptgoetz@...>
Date:   2018-02-07T19:02:54Z

Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/storm

commit 5c3a911c0aed922421f4fab387dfad550ec90b31
Author: P. Taylor Goetz <ptgoetz@...>
Date:   2018-02-13T17:13:02Z

Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/storm

commit 06f7f66db3bee17257e7610ff67178fb4ddab49d
Author: P. Taylor Goetz <ptgoetz@...>
Date:   2018-02-13T21:56:12Z

STORM-2946: Upgrade to HBase 2.0

commit 74439146a49aecf42fc129138e641c5d77ec0d76
Author: P. Taylor Goetz <ptgoetz@...>
Date:   2018-02-13T23:52:54Z

STORM-2946: checkstyle cleanup




---


[GitHub] storm-site issue #2: Update dependencies

2018-02-12 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm-site/pull/2
  
+1


---


[GitHub] storm issue #2550: STORM-2937: Overwrite latest storm-kafka-client 1.x-branc...

2018-02-07 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2550
  
+1 successfully built for me and all tests passed.

As far as squashing commits my opinion has always been if we do squash, it 
should be done when merging. There are a lot of cases where separate commits 
make review easier (e.g. separating pure whitespace changes). If commits are 
squashed during review, you have to re-read all the changes.


---


[GitHub] storm issue #2549: STORM-2936 Overwrite latest storm-kafka-client 1.x-branch...

2018-02-06 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2549
  
+1


---


[GitHub] storm issue #2547: Storm 2913 2914 1.x

2018-02-05 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2547
  
+1


---


[GitHub] storm issue #2541: STORM-2918 Update Netty version

2018-02-01 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2541
  
+1


---


[GitHub] storm issue #2532: STORM-2912 Revert optimization of sharing tick tuple (1.x...

2018-01-25 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2532
  
+1 Nice catch. Agree on performance -- tick tuples are sent infrequently 
enough that the optimization isn't necessary.


---


[GitHub] storm issue #2526: STORM-2904: Document Metrics V2

2018-01-22 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2526
  
@HeartSaVioR Fixed.


---


[GitHub] storm pull request #2526: STORM-2904: Document Metrics V2

2018-01-22 Thread ptgoetz
GitHub user ptgoetz opened a pull request:

https://github.com/apache/storm/pull/2526

STORM-2904: Document Metrics V2

Documentation only. No code changes.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ptgoetz/storm metrics_v2_docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2526.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2526


commit 7239bbda655df48d263ccbfa298de871e1c07358
Author: P. Taylor Goetz <ptgoetz@...>
Date:   2018-01-22T21:04:19Z

STORM-2904: Document Metrics V2




---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2018-01-21 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@HeartSaVioR Absolutlely! I just want to make sure we are all on board with 
the changes.


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2018-01-21 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
It seems like there is growing consensus that performance is good.

Are there any objections to merging this?


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2018-01-17 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@revans2 Are there any additional changes you'd like to see for this? I'd 
like to move forward with a 1.2 release as well as start porting this to the 
master branch.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-12 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161356664
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId){
+String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map<String, Object> stormConfig, DaemonType 
type){
+

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-12 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161324136
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId){
+String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map<String, Object> stormConfig, DaemonType 
type){
+

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-12 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161301526
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId){
+String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map<String, Object> stormConfig, DaemonType 
type){
+

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-12 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161292783
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId){
+String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map<String, Object> stormConfig, DaemonType 
type){
+

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-12 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161291952
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId){
+String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map<String, Object> stormConfig, DaemonType 
type){
+

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-11 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161139613
  
--- Diff: storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java ---
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import org.apache.storm.task.WorkerTopologyContext;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class TaskMetrics {
+ConcurrentMap<String, Counter> ackedByStream = new 
ConcurrentHashMap<>();
+ConcurrentMap<String, Counter> failedByStream = new 
ConcurrentHashMap<>();
+ConcurrentMap<String, Counter> emittedByStream = new 
ConcurrentHashMap<>();
+ConcurrentMap<String, Counter> transferredByStream = new 
ConcurrentHashMap<>();
+
+private String topologyId;
+private String componentId;
+private Integer taskId;
+private Integer workerPort;
+
+public TaskMetrics(WorkerTopologyContext context, String componentId, 
Integer taskid){
+this.topologyId = context.getStormId();
+this.componentId = componentId;
+this.taskId = taskid;
+this.workerPort = context.getThisWorkerPort();
+}
+
+public Counter getAcked(String streamId) {
+Counter c = this.ackedByStream.get(streamId);
+if (c == null) {
+c = StormMetricRegistry.counter("acked", this.topologyId, 
this.componentId, this.taskId, this.workerPort, streamId);
+this.ackedByStream.put(streamId, c);
+}
+return c;
+}
+
+public Counter getFailed(String streamId) {
+Counter c = this.ackedByStream.get(streamId);
--- End diff --

Good catch. Thanks for the fix.


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2018-01-11 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
When reporting benchmark results, we should include OS patch level. The 
recent wave of patches will likely mess with benchmarks.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-11 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161087710
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj ---
@@ -265,6 +265,7 @@
  :stats (mk-executor-stats <> (sampling-rate storm-conf))
  :interval->task->metric-registry (HashMap.)
  :task->component (:task->component worker)
+ :task-metrics (TaskMetrics/taskMetricsMap (first task-ids) (last 
task-ids) worker-context component-id)
--- End diff --

@HeartSaVioR Nice, thanks!


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2018-01-11 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@revans2 Could you take another look when you have a chance? 


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-10 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160805811
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj ---
@@ -446,7 +451,7 @@
 (.ack spout msg-id)
 (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. 
msg-id task-id time-delta))
 (when time-delta
-  (stats/spout-acked-tuple! (:stats executor-data) (:stream 
tuple-info) time-delta
+  (stats/spout-acked-tuple! (:stats executor-data) 
(StormMetricRegistry/counter "acked" (:worker-context executor-data) 
(:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream 
tuple-info)) (:stream tuple-info) time-delta
--- End diff --

That makes a lot more sense now. Thanks for clarifying.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-10 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160777488
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj ---
@@ -446,7 +451,7 @@
 (.ack spout msg-id)
 (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. 
msg-id task-id time-delta))
 (when time-delta
-  (stats/spout-acked-tuple! (:stats executor-data) (:stream 
tuple-info) time-delta
+  (stats/spout-acked-tuple! (:stats executor-data) 
(StormMetricRegistry/counter "acked" (:worker-context executor-data) 
(:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream 
tuple-info)) (:stream tuple-info) time-delta
--- End diff --

@revans2 regarding map lookup, wouldn't we have to concatenate in order to 
create the lookup key?


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2018-01-10 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@revans2 @HeartSaVioR Moved to `StringBuilder` and replaced executorId with 
taskId.


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2018-01-10 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@HeartSaVioR @revans2 

One obvious optimization worth trying is replacing `String.format()` with a 
`StringBuilder`. `String.format()` is cleaner visually, but much slower.

I'll make that change and see where it gets us. I'm open to any additional 
ideas as well.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160513274
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj ---
@@ -446,7 +451,7 @@
 (.ack spout msg-id)
 (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. 
msg-id task-id time-delta))
 (when time-delta
-  (stats/spout-acked-tuple! (:stats executor-data) (:stream 
tuple-info) time-delta
+  (stats/spout-acked-tuple! (:stats executor-data) 
(StormMetricRegistry/counter "acked" (:worker-context executor-data) 
(:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream 
tuple-info)) (:stream tuple-info) time-delta
--- End diff --

It's not ideal. Here's some background from earlier review:

@HeartSaVioR:
>> Looks like we compose metric name and lookup from REGISTRY every time, 
even without executor ID and stream ID. I can see more calculation should be 
done after addressing, but not sure how much it affects performance. If we 
could also memorize metric name per combination of parameters it might help, 
but I'm also not sure how much it will help.
    
@ptgoetz:
>Prior to adding stream ID, we could store the metric as a variable and 
reuse it without having to do a lookup on each use. Adding stream ID required 
(unless I'm missing something) doing the lookup on every use. There might be 
additional optimizations, but because metrics names are composed of several 
fields, some level of string concatenation is unavoidable. For example, we 
could try to optimize lookups by caching metrics with metric name as the key, 
but we would still have to do concatenation to create the lookup key.

>I suppose we could create a MetricName class to serve as the cache key, 
but we'd be trading string concatenation for object creation (unless we name 
MetricName mutable with setX methods).


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160508705
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/nimbus/DefaultTopologyValidator.java ---
@@ -17,15 +17,49 @@
  */
 package org.apache.storm.nimbus;
 
+import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Map;
 
 public class DefaultTopologyValidator implements ITopologyValidator {
+private static final Logger LOG = 
LoggerFactory.getLogger(DefaultTopologyValidator.class);
 @Override
 public void prepare(Map StormConf){
 }
 @Override
-public void validate(String topologyName, Map topologyConf, 
StormTopology topology) throws InvalidTopologyException {
+public void validate(String topologyName, Map topologyConf, 
StormTopology topology) throws InvalidTopologyException {
+if(topologyName.contains(".")){
+LOG.warn("Metrics for topology name '{}' will be reported as 
'{}'.", topologyName, topologyName.replace('.', '_') );
+}
+Map<String, SpoutSpec> spouts = topology.get_spouts();
+for(String spoutName : spouts.keySet()){
+if(spoutName.contains(".")){
+LOG.warn("Metrics for spout name '{}' will be reported as 
'{}'.", spoutName, spoutName.replace('.', '_') );
+}
+SpoutSpec spoutSpec = spouts.get(spoutName);
+for(String streamName : 
spoutSpec.get_common().get_streams().keySet()){
+if(streamName.contains(".")){
+LOG.warn("Metrics for stream name '{}' will be 
reported as '{}'.", streamName, streamName.replace('.', '_') );
+}
+}
+}
+
+Map<String, Bolt> bolts = topology.get_bolts();
+for(String boltName : bolts.keySet()){
+if(boltName.contains(".")){
+LOG.warn("Metrics for bolt name '{}' will be reported as 
'{}'.", boltName, boltName.replace('.', '_') );
+}
+Bolt bolt = bolts.get(boltName);
+for(String streamName : 
bolt.get_common().get_streams().keySet()){
+if(streamName.contains(".")){
+LOG.warn("Metrics for stream name '{}' will be 
reported as '{}'.", streamName, streamName.replace('.', '_') );
+}
+}
+}
--- End diff --

True. The user will have to hunt, but I suppose it's better than silence.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160506051
  
--- Diff: storm-core/src/jvm/org/apache/storm/task/TopologyContext.java ---
@@ -386,4 +388,28 @@ public ReducedMetric registerMetric(String name, 
IReducer reducer, int timeBucke
 public CombinedMetric registerMetric(String name, ICombiner combiner, 
int timeBucketSizeInSecs) {
 return registerMetric(name, new CombinedMetric(combiner), 
timeBucketSizeInSecs);
 }
+
+public Timer registerTimer(String name){
+return StormMetricRegistry.registry().timer(metricName(name));
+}
+
+public Histogram registerHistogram(String name){
+return StormMetricRegistry.registry().histogram(metricName(name));
+}
+
+public Meter registerMeter(String name){
+return StormMetricRegistry.registry().meter(metricName(name));
+}
+
+public Counter registerCounter(String name){
+return StormMetricRegistry.registry().counter(metricName(name));
+}
+
+public Gauge registerGauge(String name, Gauge gauge){
+return StormMetricRegistry.registry().register(metricName(name), 
gauge);
+}
+
+private String metricName(String name){
+return String.format("storm.topology.%s.%s.%s-%s", getStormId(), 
getThisComponentId(), getThisWorkerPort(), name);
--- End diff --

Yes. Good catch.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160505473
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, String executorId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,executorId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, String executorId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,executorId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map<String, Object> stormConfig, DaemonType 
type){
+String localHost = "localhost";
+try {
+hostName = dotToUnderScore(Utils.localHostname());
+} catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname will be reported" +
+ 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160499395
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId){
+String metricName = metricName(name, context.getStormId(), 
componentId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static void start(Map<String, Object> stormConfig, DaemonType 
type){
+String localHost = 
(String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME);
+if(localHost != null){
+hostName = localHost;
+} else {
+try {
+hostName = 
InetAddress.getLocalHost().getCanonicalHostName();
+} catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname ill be reported" +
+ " as 'localhost'.");
+}
+}
+
+LOG.info("Starting metrics reporters...");
+List<Map<String, Object>> reporterList = (List<Map<String, 
Object>>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+if(repo

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-03 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r159519245
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---
@@ -65,7 +66,7 @@
 private static final Object INTERRUPT = new Object();
 private static final String PREFIX = "disruptor-";
 private static final FlusherPool FLUSHER = new FlusherPool();
-private static final Timer METRICS_TIMER = new 
Timer("disruptor-metrics-timer", true);
+private static final ScheduledThreadPoolExecutor 
METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1);
--- End diff --

You were right. Fixed.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-03 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r159515622
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---
@@ -65,7 +66,7 @@
 private static final Object INTERRUPT = new Object();
 private static final String PREFIX = "disruptor-";
 private static final FlusherPool FLUSHER = new FlusherPool();
-private static final Timer METRICS_TIMER = new 
Timer("disruptor-metrics-timer", true);
+private static final ScheduledThreadPoolExecutor 
METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1);
--- End diff --

Good catch. I'll check and update as necessary.


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-12-22 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@srdo Thanks for the review. I think I've addressed your comments in the 
latest commit.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-22 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r158549262
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+protected long reportingPeriod;
+protected TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
+LOG.debug("Starting...");
+reporter.start(reportingPeriod, reportingPeriodUnit);
+} else {
+throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+}
+}
+
+@Override
+public void stop() {
+if (reporter != null) {
+LOG.debug("Stopping...");
+reporter.stop();
+} else {
+throw new IllegalStateException("Attempt to stop without 
preparing " + getClass().getSimpleName());
+}
+}
+
+
+public static TimeUnit getReportPeriodUnit(Map<String, Object> 
reporterConf) {
+TimeUnit unit = getTimeUnitForConfig(reporterConf, 
REPORT_PERIOD_UNITS);
+return unit == null ? TimeUnit.SECONDS : unit;
+}
+
+private static TimeUnit getTimeUnitForConfig(Map reporterConf, String 
configName) {
+String rateUnitString = 
Utils.getString(reporterConf.get(configName), null);
+if (rateUnitString != null) {
+return TimeUnit.valueOf(rateUnitString);
+}
+return null;
+}
+
+public static long getReportPeriod(Map reporterConf) {
+return Utils.getInt(reporterConf.get(REPORT_PERIOD), 
10).longValue();
+}
+
+public static StormMetricsFilter getMetricsFilter(Map reporterConf){
+StormMetricsFilter filter = null;
+Map<String, Object> filterConf = (Map)reporterConf.get("filter");
+if(filterConf != null) {
+String clazz = (String) filterConf.get("class");
+if (clazz != null) {
+try {
+filter = (StormMetricsFilter) 
Metrics2Utils.instantiate(clazz);
+filter.prepare(filterConf);
+} catch (Exception e) {
+LOG.warn("Unable to instantiate StormMetricsFilter 
class: {}", clazz);
--- End diff --

Okay. I changed it to crash instead.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-22 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r158549210
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
@@ -493,6 +493,46 @@ public void validateField(String name, Object o) {
 }
 }
 
+public static class MetricReportersValidator extends Validator {
+
+@Override
+public void validateField(String name, Object o) {
+if(o == null) {
+return;
+}
+SimpleTypeValidator.validateField(name, Map.class, o);
+if(!((Map) o).containsKey("class") ) {
--- End diff --

Done.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-22 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r158549192
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---
@@ -418,12 +430,19 @@ public DisruptorQueue(String queueName, ProducerType 
type, int size, long readTi
 _barrier = _buffer.newBarrier();
 _buffer.addGatingSequences(_consumer);
 _metrics = new QueueMetrics();
+_disruptorMetrics = 
StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, port);
 //The batch size can be no larger than half the full queue size.
 //This is mostly to avoid contention issues.
 _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2));
 
 _flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
 _flusher.start();
+METRICS_TIMER.schedule(new TimerTask(){
--- End diff --

Addressed in latest commit.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-22 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r158545079
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java ---
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+
+import java.util.Map;
+
+public interface StormReporter extends Reporter {
--- End diff --

If I'm interpreting your question correctly, that's what the `stop()` 
method is for.


---


[GitHub] storm issue #2462: STORM-2858: Fix worker-launcher build by erroring out if ...

2017-12-21 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2462
  
+1


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-12-20 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@arunmahadevan Rebased.


---


[GitHub] storm issue #2469: STORM-2861: Explicit reference kafka-schema-registry-clie...

2017-12-19 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2469
  
@vesense I tried to reproduce this by deleting Kafka-avro-serializer from 
my local maven repository, but the build succeeded. Can you elaborate a bit?


---


[GitHub] storm issue #2470: [STORM-2690] resurrect invocation of ISupervisor.assigned...

2017-12-19 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2470
  
+1


---


[GitHub] storm issue #2472: [STORM-2690] resurrect invocation of ISupervisor.assigned...

2017-12-19 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2472
  
+1


---


[GitHub] storm issue #2471: [STORM-2690] resurrect invocation of ISupervisor.assigned...

2017-12-19 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2471
  
+1


---


[GitHub] storm issue #2468: [STORM-2690] resurrect invocation of ISupervisor.assigned...

2017-12-19 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2468
  
@erikdw Excellent! Thanks bringing this up and helping solve the issue.


---


[GitHub] storm issue #2468: [STORM-2690] resurrect invocation of ISupervisor.assigned...

2017-12-19 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2468
  
+1

@erikdw Are you planning to port to earlier branches?


---


[GitHub] storm pull request #2458: (1.x) STORM-2854 Expose IEventLogger to make event...

2017-12-14 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2458#discussion_r157091241
  
--- Diff: storm-core/src/jvm/org/apache/storm/metric/IEventLogger.java ---
@@ -31,32 +32,54 @@
 /**
  * A wrapper for the fields that we would log.
  */
-public static class EventInfo {
-String ts;
-String component;
-String task;
-String messageId;
-String values;
-EventInfo(String ts, String component, String task, String 
messageId, String values) {
+class EventInfo {
+private long ts;
+private String component;
+private int task;
+private Object messageId;
+private List values;
+
+public EventInfo(long ts, String component, int task, Object 
messageId, List values) {
 this.ts = ts;
 this.component = component;
 this.task = task;
 this.messageId = messageId;
 this.values = values;
 }
 
+public long getTs() {
+return ts;
+}
+
+public String getComponent() {
+return component;
+}
+
+public int getTask() {
+return task;
+}
+
+public Object getMessageId() {
+return messageId;
+}
+
+public List getValues() {
+return values;
+}
+
 /**
  * Returns a default formatted string with fields separated by ","
  *
  * @return a default formatted string with fields separated by ","
  */
 @Override
 public String toString() {
-return new Date(Long.parseLong(ts)).toString() + "," + 
component + "," + task + "," + messageId + "," + values;
+return new Date(ts).toString() + "," + component + "," + 
String.valueOf(task) + ","
--- End diff --

Fair enough. I think it would be helpful to document that.


---


[GitHub] storm issue #2459: STORM-2855: Revert to 2017Q4 Ubuntu image in Travis to fi...

2017-12-14 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2459
  
+1


---


[GitHub] storm issue #2447: STORM-2845 Drop standalone mode of Storm SQL

2017-12-14 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2447
  
+1


---


[GitHub] storm pull request #2458: (1.x) STORM-2854 Expose IEventLogger to make event...

2017-12-14 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2458#discussion_r157056998
  
--- Diff: storm-core/src/jvm/org/apache/storm/metric/IEventLogger.java ---
@@ -31,32 +32,54 @@
 /**
  * A wrapper for the fields that we would log.
  */
-public static class EventInfo {
-String ts;
-String component;
-String task;
-String messageId;
-String values;
-EventInfo(String ts, String component, String task, String 
messageId, String values) {
+class EventInfo {
+private long ts;
+private String component;
+private int task;
+private Object messageId;
+private List values;
+
+public EventInfo(long ts, String component, int task, Object 
messageId, List values) {
 this.ts = ts;
 this.component = component;
 this.task = task;
 this.messageId = messageId;
 this.values = values;
 }
 
+public long getTs() {
+return ts;
+}
+
+public String getComponent() {
+return component;
+}
+
+public int getTask() {
+return task;
+}
+
+public Object getMessageId() {
+return messageId;
+}
+
+public List getValues() {
+return values;
+}
+
 /**
  * Returns a default formatted string with fields separated by ","
  *
  * @return a default formatted string with fields separated by ","
  */
 @Override
 public String toString() {
-return new Date(Long.parseLong(ts)).toString() + "," + 
component + "," + task + "," + messageId + "," + values;
+return new Date(ts).toString() + "," + component + "," + 
String.valueOf(task) + ","
--- End diff --

Do we want to support configurable date formats or default to 
`Date.toString()`?


---


[GitHub] storm pull request #2458: (1.x) STORM-2854 Expose IEventLogger to make event...

2017-12-14 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2458#discussion_r157056272
  
--- Diff: conf/storm.yaml.example ---
@@ -72,4 +72,11 @@
 # argument:
 #   - endpoint: "metrics-collector.mycompany.org"
 #
-# storm.cluster.metrics.consumer.publish.interval.secs: 60
\ No newline at end of file
+# storm.cluster.metrics.consumer.publish.interval.secs: 60
+
+# Event Logger
+# topology.event.logger.register:
+#   - class: "org.apache.storm.metric.FileBasedEventLogger"
+#   - class: "org.mycompany.MyEventLogger"
+# argument:
--- End diff --

Nit: May want to make this "arguments" if it is a list.


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-12-14 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
> Looks like we compose metric name and lookup from REGISTRY every time, 
even without executor ID and stream ID. I can see more calculation should be 
done after addressing, but not sure how much it affects performance. If we 
could also memorize metric name per combination of parameters it might help, 
but I'm also not sure how much it will help.

Prior to adding stream ID, we could store the metric as a variable and 
reuse it without having to do a lookup on each use. Adding stream ID required 
(unless I'm missing something) doing the lookup on every use. There might be 
additional optimizations, but because metrics names are composed of several 
fields, some level of string concatenation is unavoidable. For example, we 
could try to optimize lookups by caching metrics with metric name as the key, 
but we would still have to do concatenation to create the lookup key.

I suppose we could create a `MetricName` class to serve as the cache key, 
but we'd be trading string concatenation for object creation (unless we name 
`MetricName` mutable with `setX` methods).

> Regarding issuing warning on name, +1 on your approach. Looks nice!

Thanks for the feedback. Implemented in latest commit.


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-12-13 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@revans2 @HeartSaVioR 

I added stream id and executor id to the metrics names, and implemented 
replacing "." with "_".

One consequence of adding stream id was having to get-or-create metrics on 
the fly, as opposed to creating them up-front. That means there will be a 
lookup, string concat, and string replacement on each metrics update, which 
could affect performance. (Unless I'm missing something obvious, we have to do 
it that way because we don't know the stream IDs ahead of time and have to 
instantiate metrics as we see them).

As far as issuing warnings when names contain ".", one option would be to 
handle the warnings in an `ITopologyValidator` instance. We could have 
`DefaultTopologyValidator` log warnings when certain names contain a ".". We 
could also provide something along the lines of a `StrictTopologyValidator` 
that throws a `InvalidTopologyException` as opposed to just warning. That might 
be an easy way to transition from warning to error that also gives users to 
turn strict checking on or off.

What do you think about that approach?


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-12-08 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@revans2 Thanks for clarifying. I have a better understanding of what 
you're saying now.

How do you feel about just replacing "." with "_" on all metrics path 
components (host name, componentId, etc.)? That would ensure a path could be 
reliably split on the "." character.

I'll add stream id and executor id to the path in places where they are 
available.



---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-08 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155864521
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+protected long reportingPeriod;
+protected TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
+LOG.debug("Starting...");
+reporter.start(reportingPeriod, reportingPeriodUnit);
+} else {
+throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+}
+}
+
+@Override
+public void stop() {
+if (reporter != null) {
+LOG.debug("Stopping...");
+reporter.stop();
+} else {
+throw new IllegalStateException("Attempt to stop without 
preparing " + getClass().getSimpleName());
+}
+}
+
+
+public static TimeUnit getReportPeriodUnit(Map<String, Object> 
reporterConf) {
+TimeUnit unit = getTimeUnitForConfig(reporterConf, 
REPORT_PERIOD_UNITS);
+return unit == null ? TimeUnit.SECONDS : unit;
+}
+
+private static TimeUnit getTimeUnitForConfig(Map reporterConf, String 
configName) {
+String rateUnitString = 
Utils.getString(reporterConf.get(configName), null);
+if (rateUnitString != null) {
+return TimeUnit.valueOf(rateUnitString);
+}
+return null;
+}
+
+public static long getReportPeriod(Map reporterConf) {
+return Utils.getInt(reporterConf.get(REPORT_PERIOD), 
10).longValue();
+}
+
+public static StormMetricsFilter getMetricsFilter(Map reporterConf){
+StormMetricsFilter filter = null;
+Map<String, Object> filterConf = (Map)reporterConf.get("filter");
+if(filterConf != null) {
+String clazz = (String) filterConf.get("class");
+if (clazz != null) {
+try {
+filter = (StormMetricsFilter) 
Metrics2Utils.instantiate(clazz);
+filter.prepare(filterConf);
+} catch (Exception e) {
+LOG.warn("Unable to instantiate StormMetricsFilter 
class: {}", clazz);
--- End diff --

I could go either way... But it didn't seem right to crash just because of 
a misconfigured filter.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-08 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155863202
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
@@ -493,6 +493,46 @@ public void validateField(String name, Object o) {
 }
 }
 
+public static class MetricReportersValidator extends Validator {
+
+@Override
+public void validateField(String name, Object o) {
+if(o == null) {
+return;
+}
+SimpleTypeValidator.validateField(name, Map.class, o);
+if(!((Map) o).containsKey("class") ) {
+throw new IllegalArgumentException( "Field " + name + " 
must have map entry with key: class");
+}
+if(!((Map) o).containsKey("daemons") ) {
+throw new IllegalArgumentException("Field " + name + " 
must have map entry with key: daemons");
+} else {
+// daemons can only be 'nimbus', 'supervisor', or 'worker'
+Object list = ((Map)o).get("daemons");
+if(list == null || !(list instanceof List)){
+throw new IllegalArgumentException("Field 'daemons' 
must be a non-null list.");
+}
+List daemonList = (List)list;
+for(Object string : daemonList){
+if (string instanceof String &&
+(((String) string).equals("nimbus") ||
+((String) string).equals("supervisor") 
||
+((String) string).equals("worker"))) {
+return;
+}
+throw new IllegalArgumentException("Field daemons must 
contain at least one of \"nimbus\", \"supervisor\", or \"worker\"");
+}
+
+}
+if(((Map)o).containsKey("filter")){
+Map filterMap = (Map)((Map)o).get("filter");
+SimpleTypeValidator.validateField("filter", String.class, 
filterMap.get("class"));
--- End diff --

Yes. Good catch.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-08 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155863192
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
@@ -493,6 +493,46 @@ public void validateField(String name, Object o) {
 }
 }
 
+public static class MetricReportersValidator extends Validator {
+
+@Override
+public void validateField(String name, Object o) {
+if(o == null) {
+return;
+}
+SimpleTypeValidator.validateField(name, Map.class, o);
+if(!((Map) o).containsKey("class") ) {
+throw new IllegalArgumentException( "Field " + name + " 
must have map entry with key: class");
+}
+if(!((Map) o).containsKey("daemons") ) {
+throw new IllegalArgumentException("Field " + name + " 
must have map entry with key: daemons");
+} else {
+// daemons can only be 'nimbus', 'supervisor', or 'worker'
+Object list = ((Map)o).get("daemons");
+if(list == null || !(list instanceof List)){
+throw new IllegalArgumentException("Field 'daemons' 
must be a non-null list.");
+}
+List daemonList = (List)list;
+for(Object string : daemonList){
+if (string instanceof String &&
+(((String) string).equals("nimbus") ||
+((String) string).equals("supervisor") 
||
+((String) string).equals("worker"))) {
+return;
+}
+throw new IllegalArgumentException("Field daemons must 
contain at least one of \"nimbus\", \"supervisor\", or \"worker\"");
+}
+
+}
+if(((Map)o).containsKey("filter")){
+Map filterMap = (Map)((Map)o).get("filter");
+SimpleTypeValidator.validateField("filter", String.class, 
filterMap.get("class"));
+}
+SimpleTypeValidator.validateField(name, String.class, ((Map) 
o).get("class"));
--- End diff --

Not here. The `name` variable should contain "class" here.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-08 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155856433
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
@@ -493,6 +493,46 @@ public void validateField(String name, Object o) {
 }
 }
 
+public static class MetricReportersValidator extends Validator {
+
+@Override
+public void validateField(String name, Object o) {
+if(o == null) {
+return;
+}
+SimpleTypeValidator.validateField(name, Map.class, o);
+if(!((Map) o).containsKey("class") ) {
+throw new IllegalArgumentException( "Field " + name + " 
must have map entry with key: class");
+}
+if(!((Map) o).containsKey("daemons") ) {
+throw new IllegalArgumentException("Field " + name + " 
must have map entry with key: daemons");
+} else {
+// daemons can only be 'nimbus', 'supervisor', or 'worker'
+Object list = ((Map)o).get("daemons");
+if(list == null || !(list instanceof List)){
+throw new IllegalArgumentException("Field 'daemons' 
must be a non-null list.");
+}
+List daemonList = (List)list;
+for(Object string : daemonList){
+if (string instanceof String &&
+(((String) string).equals("nimbus") ||
+((String) string).equals("supervisor") 
||
+((String) string).equals("worker"))) {
+return;
--- End diff --

Good catch.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-08 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155855673
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+protected long reportingPeriod;
+protected TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
+LOG.debug("Starting...");
+reporter.start(reportingPeriod, reportingPeriodUnit);
+} else {
+throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+}
+}
+
+@Override
+public void stop() {
+if (reporter != null) {
+LOG.debug("Stopping...");
+reporter.stop();
+} else {
+throw new IllegalStateException("Attempt to stop without 
preparing " + getClass().getSimpleName());
+}
+}
+
+
+public static TimeUnit getReportPeriodUnit(Map<String, Object> 
reporterConf) {
+TimeUnit unit = getTimeUnitForConfig(reporterConf, 
REPORT_PERIOD_UNITS);
+return unit == null ? TimeUnit.SECONDS : unit;
--- End diff --

We can handle this in documentation. Documentation is pending the 
finalization of the approach to metrics naming, paths, metadata, etc.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-08 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155855300
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+protected long reportingPeriod;
+protected TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
--- End diff --

Yes, but it gives additional information about what went wrong as opposed 
to a "naked" NPE.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-08 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155854567
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
 ---
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ganglia.GangliaReporter;
+import com.codahale.metrics.MetricRegistry;
+import info.ganglia.gmetric4j.gmetric.GMetric;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class GangliaStormReporter extends ScheduledStormReporter {
+private final static Logger LOG = 
LoggerFactory.getLogger(GangliaStormReporter.class);
+
+public static final String GANGLIA_HOST = "ganglia.host";
+public static final String GANGLIA_PORT = "ganglia.port";
+public static final String GANGLIA_PREFIXED_WITH = 
"ganglia.prefixed.with";
+public static final String GANGLIA_DMAX = "ganglia.dmax";
+public static final String GANGLIA_TMAX = "ganglia.tmax";
+public static final String GANGLIA_UDP_ADDRESSING_MODE = 
"ganglia.udp.addressing.mode";
+public static final String GANGLIA_RATE_UNIT = "ganglia.rate.unit";
+public static final String GANGLIA_DURATION_UNIT = 
"ganglia.duration.unit";
+public static final String GANGLIA_TTL = "ganglia.ttl";
+public static final String GANGLIA_UDP_GROUP = "ganglia.udp.group";
+
+@Override
+public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map 
reporterConf) {
+LOG.debug("Preparing...");
+GangliaReporter.Builder builder = 
GangliaReporter.forRegistry(metricsRegistry);
+
+TimeUnit durationUnit = 
MetricsUtils.getMetricsDurationUnit(reporterConf);
+if (durationUnit != null) {
+builder.convertDurationsTo(durationUnit);
+}
+
+TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+if (rateUnit != null) {
+builder.convertRatesTo(rateUnit);
+}
+
+StormMetricsFilter filter = getMetricsFilter(reporterConf);
+if(filter != null){
+builder.filter(filter);
+}
+String prefix = getMetricsPrefixedWith(reporterConf);
+if (prefix != null) {
+builder.prefixedWith(prefix);
+}
+
+Integer dmax = getGangliaDMax(reporterConf);
+if (prefix != null) {
+builder.withDMax(dmax);
+}
+
+Integer tmax = getGangliaTMax(reporterConf);
+if (prefix != null) {
+builder.withTMax(tmax);
+}
+
+//defaults to 10
+reportingPeriod = getReportPeriod(reporterConf);
+
+//defaults to seconds
+reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+// Not exposed:
--- End diff --

It means the `GangliaReporter.Builder.withClock` method is not 
exposed/setable by the storm configuration. I'll remove it.


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-12-08 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@revans2 (cc @HeartSaVioR ) Regarding metadata, parseable metrics 
names/paths, etc. what do you think of the following approach?

In a nutshell, make everything configurable (with sane defaults), something 
along the lines:

METRICS_V2_PREFIX (String prepended to all metrics paths, replacing 
hard-coded "storm.worker", etc. in current code)
METRICS_V2_PATH_DELIMITER (String/character used to separate metrics path, 
replaces hard-coded "." in current code.
METRICS_V2_INVALID_NAME_REGEX (Regex that checks user-supplied metrics 
names for disallowed characters. Would be used to prevent users from 
inadvertently breaking up a path, for example by putting a "." in a metric name 
when that's used as the delimiter.

Does that seem reasonable?


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-11-29 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@HeartSaVioR Agreed/understood. We can squash on merge and cleanup commit 
messages.

This is a feature branch. You can commit directly if you want.


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-11-29 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@HeartSaVioR Thanks for the update. I pulled your changes into the 
metrics_v2 branch.

@revans2 I'll start working on naming conventions and disallowing certain 
delimiter characters. If you have any sample paths/names that illustrate your 
thinking that would be helpful. 


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-11-21 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
Crude test, but illustrates the cost of meters:
(code marks a meter | increments a counter from 0 to `Integer.MAX`)
```
*** METER ***
Time: 126.39
ops/sec: 16,990,930
*** COUNTER ***
Time: 18.221
ops/sec: 117,857,617
```

The obvious path would be to switch critical path metrics to use counters. 
But I'd ultimately err on the side of user choice (e.g. let users decide which 
to use). That could be made configurable. I can imagine use cases where users 
would be willing to take a minor performance hit for more performance metrics 
(e.g. "sleepy" topologies). The performance hit could be tolerable in certain 
situations.

For the time being, I'll switch some of the metrics to counters, and re-run.


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-11-21 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@revans2 Thanks for the update.

My first instinct is that it’s usage of Meter on the critical path. 
Adding anything that “does something” there is going to add some level of 
overhead. I’ll do some (micro)benchmarking and experiments to find out. I 
think there will be a number of mitigation options.


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-11-20 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@HeartSaVioR I know. ;)

I'm thinking more in terms of hardware profile and what other processes are 
running.


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-11-20 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
We really need hardware and environment  information. I'd also argue that 
tests should be run headless. I've seen some benchmarks vary greatly on a MBP 
depending on what you're doing at the time. Something as mundane as checking 
email, etc. can affect benchmark results.

sigar might be an easy target (since it's included) to get some of that 
info.







---


[GitHub] storm issue #2409: STORM-2796: Flux: Provide means for invoking static facto...

2017-11-16 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2409
  
@roshannaik See latest commit. I merged/fixed your patch. 

Most of the issues were typos (e.g. "refList" != "reflist"). The other 
issue was that properties in flux do not (currently) support ref lists. 
However, you can work around it by invoking the setter as a config method:

```yaml
configMethods:
  - name: "setTimeLenArr"
args:
  - reflist: ["time1", "time2"]
```

We can probably handle property ref list support as a separate issue since 
there is a fairly easy workaround.


---


[GitHub] storm issue #2409: STORM-2796: Flux: Provide means for invoking static facto...

2017-11-16 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2409
  
@HeartSaVioR Yes, I will add assertions, although the tests would still 
fail without them (the constructor/method won't be found and a 
IllegalArgumentException will be thrown).

@roshannaik Thanks for the patch. I'll take a look.


---


[GitHub] storm issue #2409: STORM-2796: Flux: Provide means for invoking static facto...

2017-11-15 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2409
  
@HeartSaVioR Thanks for the review. Addressed your comment and fixed 
another issue discovered in testing.


---


[GitHub] storm issue #2409: STORM-2796: Flux: Provide means for invoking static facto...

2017-11-10 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2409
  
I can squash the commits later, but for now keeping the check style commit 
separate will make review a lot easier.


---


[GitHub] storm pull request #2409: STORM-2796: Flux: Provide means for invoking stati...

2017-11-10 Thread ptgoetz
GitHub user ptgoetz opened a pull request:

https://github.com/apache/storm/pull/2409

STORM-2796: Flux: Provide means for invoking static factory methods

Note: The guts of this change are in the first commit. The last commit 
simply addresses check style violations.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ptgoetz/storm flux_factory_methods

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2409.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2409


commit f0abe4757076f78d987cd266e6ce4d4bd847cfb2
Author: P. Taylor Goetz <ptgo...@gmail.com>
Date:   2017-11-04T03:17:46Z

STORM-2796: Implement support for static factory methods

commit fd0d10feff3ff3beae29276c7a942a3617154658
Author: P. Taylor Goetz <ptgo...@gmail.com>
Date:   2017-11-10T18:10:03Z

add test for factory args

commit 5a6be5fb4a1bce4b7a9ddb332533aa4167ec543b
Author: P. Taylor Goetz <ptgo...@gmail.com>
Date:   2017-11-10T20:51:24Z

Address checkstyle errors.




---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-09-29 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@revans2 Added rudimentary sanity check validation for metrics reporters 
configs. Because reporter implementations may want to have their own custom 
config keys, we can't really cover everything. I think we'll have to settle on 
basic sanity checks + reporters documenting the custom configuration fields 
they expose.

I'll think more about metrics naming and metadata. Like I said, feedback, 
opinions and pull requests against this branch are more than welcome.


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-09-29 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@revans2 I agree with you regarding getting the metrics naming correct. In 
fact I think it's one of the most important aspects. I'd like to get as much 
input/collaboration from others as possible to make sure we get it right from 
the start.


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-09-29 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@revans2, @HeartSaVioR, @arunmahadevan Addressed review comments.

Also note that this is a feature branch, so it's open to pull requests from 
anyone. If there's anything you'd like to see that I haven't addressed (e.g. 
adding fields to metrics names) feel free to suggest.

Re: Documentation and port to master: Once this is merged I plan to add 
documentation and port to master. I can file a JIRA for those if desired.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-29 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r141924200
  
--- Diff: storm-core/src/jvm/org/apache/storm/metrics2/Metrics2Utils.java 
---
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+public class Metrics2Utils {
--- End diff --

The `Utils.newInstance()` method throws `RuntimeException` if the class 
instantiation fails. I don't think we want that in this case.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-29 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r141922507
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId){
+String metricName = metricName(name, context.getStormId(), 
componentId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static void start(Map<String, Object> stormConfig, DaemonType 
type){
+String localHost = 
(String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME);
+if(localHost != null){
+hostName = localHost;
+} else {
+try {
+hostName = 
InetAddress.getLocalHost().getCanonicalHostName();
+} catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname ill be reported" +
+ " as 'localhost'.");
+}
+}
+
+LOG.info("Starting metrics reporters...");
+List<Map<String, Object>> reporterList = (List<Map<String, 
Object>>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+if(repo

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r141137280
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
--- End diff --

Yes it is thread-safe. Will double-check on the multiple registrations and 
update as appropriate.


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@HeartSaVioR 

> We need to document how to use new metrics and its reporter, and also 
need to have patch for master branch (maybe with removal of metrics V1 public 
API).

As I mentioned elsewhere, I want to hold off on porting this to the master 
branch until all comments are addressed and this patch is in a mergeable state. 
That way we can avoid porting small changes between the two branches.

As far as deprecating/removing the metrics v1 API, I'd hold off until there 
is a consensus that this approach is a suitable replacement for most use cases.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r141137596
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
--- End diff --

> Given that all the values are calculated from outside, isn't Gauge right 
on all fields?

Yes. I removed the comment and left it as a Guage. Not sure why that 
comment is still showing up in the GitHub diff.


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@arunmahadevan 
> Currently this patch addresses registering custom metrics and sending the 
metrics out via reporters and does not send any metrics to nimbus right? Why is 
disruptor metrics handled as a part of this?

No, this does not send any metrics to nimbus. Disruptor metrics are 
included because many users would like to track those metrics (and they aren't 
available via Storm UI/REST).

> There will be one thread per reporter per worker which sends out the 
metrics to the destination? Just thinking if it would overwhelm the destination 
and would need some local aggregation and have the supervisor report the 
metrics.

The idea here was to purposely avoid attempting aggregation, since in some 
cases it's impossible (e.g. there's no way to aggregate histograms). Instead, 
any aggregation is assumed to take place in the destination system (Grafana, 
etc.).


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r141131796
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java 
---
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+
+public class AnchoredWordCount {
+public static class RandomSentenceSpout extends BaseRichSpout {
+private static final Logger LOG = 
LoggerFactory.getLogger(RandomSentenceSpout.class);
+
+SpoutOutputCollector _collector;
+Random _rand;
--- End diff --

Absolutely (I'm personally not a big fan of the "_" convention). In general 
(outside the main branch) I try to follow the conventions that exist in the 
current source for consistency.


---


[GitHub] storm issue #2329: STORM-2722: close the JMSSpout in the tests when done

2017-09-18 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2329
  
+1


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-08-11 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r132748013
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java ---
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import org.apache.storm.utils.DisruptorQueue;
+
+public class DisruptorMetrics {
+private SimpleGauge capacity;
+private SimpleGauge population;
+private SimpleGauge writePosition;
+private SimpleGauge readPosition;
+private SimpleGauge arrivalRate; // TODO: Change to meter
--- End diff --

Yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   5   6   7   >