[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2203


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-12 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161356664
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId){
+String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+try {
+hostName = dotToUnderScore(Utils.localHostname());
+} catch (UnknownHostException e) {
+ 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-12 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161326958
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId){
+String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+try {
+hostName = dotToUnderScore(Utils.localHostname());
+} catch (UnknownHostException e) {
+ 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-12 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161324136
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId){
+String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+try {
+hostName = dotToUnderScore(Utils.localHostname());
+} catch (UnknownHostException e) {
+ 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-12 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161306615
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId){
+String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+try {
+hostName = dotToUnderScore(Utils.localHostname());
+} catch (UnknownHostException e) {
+ 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-12 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161303474
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId){
+String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+try {
+hostName = dotToUnderScore(Utils.localHostname());
+} catch (UnknownHostException e) {
+ 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-12 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161301526
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId){
+String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+try {
+hostName = dotToUnderScore(Utils.localHostname());
+} catch (UnknownHostException e) {
+ 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-12 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161292783
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId){
+String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+try {
+hostName = dotToUnderScore(Utils.localHostname());
+} catch (UnknownHostException e) {
+ 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-12 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161291952
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId){
+String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+try {
+hostName = dotToUnderScore(Utils.localHostname());
+} catch (UnknownHostException e) {
+ 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-12 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161277608
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId){
+String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+try {
+hostName = dotToUnderScore(Utils.localHostname());
+} catch (UnknownHostException e) {
+ 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-12 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161277889
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId){
+String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+try {
+hostName = dotToUnderScore(Utils.localHostname());
+} catch (UnknownHostException e) {
+ 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-12 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161277857
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, new 
SimpleGauge<>(initialValue));
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, Integer taskId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,taskId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static Counter counter(String name, String topologyId, String 
componentId, Integer taskId, Integer workerPort, String streamId){
+String metricName = metricName(name, topologyId, componentId, 
streamId,taskId, workerPort);
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+try {
+hostName = dotToUnderScore(Utils.localHostname());
+} catch (UnknownHostException e) {
+ 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-11 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161139613
  
--- Diff: storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java ---
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import org.apache.storm.task.WorkerTopologyContext;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class TaskMetrics {
+ConcurrentMap ackedByStream = new 
ConcurrentHashMap<>();
+ConcurrentMap failedByStream = new 
ConcurrentHashMap<>();
+ConcurrentMap emittedByStream = new 
ConcurrentHashMap<>();
+ConcurrentMap transferredByStream = new 
ConcurrentHashMap<>();
+
+private String topologyId;
+private String componentId;
+private Integer taskId;
+private Integer workerPort;
+
+public TaskMetrics(WorkerTopologyContext context, String componentId, 
Integer taskid){
+this.topologyId = context.getStormId();
+this.componentId = componentId;
+this.taskId = taskid;
+this.workerPort = context.getThisWorkerPort();
+}
+
+public Counter getAcked(String streamId) {
+Counter c = this.ackedByStream.get(streamId);
+if (c == null) {
+c = StormMetricRegistry.counter("acked", this.topologyId, 
this.componentId, this.taskId, this.workerPort, streamId);
+this.ackedByStream.put(streamId, c);
+}
+return c;
+}
+
+public Counter getFailed(String streamId) {
+Counter c = this.ackedByStream.get(streamId);
--- End diff --

Good catch. Thanks for the fix.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161135637
  
--- Diff: storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java ---
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import org.apache.storm.task.WorkerTopologyContext;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class TaskMetrics {
+ConcurrentMap ackedByStream = new 
ConcurrentHashMap<>();
+ConcurrentMap failedByStream = new 
ConcurrentHashMap<>();
+ConcurrentMap emittedByStream = new 
ConcurrentHashMap<>();
+ConcurrentMap transferredByStream = new 
ConcurrentHashMap<>();
+
+private String topologyId;
+private String componentId;
+private Integer taskId;
+private Integer workerPort;
+
+public TaskMetrics(WorkerTopologyContext context, String componentId, 
Integer taskid){
+this.topologyId = context.getStormId();
+this.componentId = componentId;
+this.taskId = taskid;
+this.workerPort = context.getThisWorkerPort();
+}
+
+public Counter getAcked(String streamId) {
+Counter c = this.ackedByStream.get(streamId);
+if (c == null) {
+c = StormMetricRegistry.counter("acked", this.topologyId, 
this.componentId, this.taskId, this.workerPort, streamId);
+this.ackedByStream.put(streamId, c);
+}
+return c;
+}
+
+public Counter getFailed(String streamId) {
+Counter c = this.ackedByStream.get(streamId);
--- End diff --

missed a spot: it should be `failedByStream`


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-11 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161087710
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj ---
@@ -265,6 +265,7 @@
  :stats (mk-executor-stats <> (sampling-rate storm-conf))
  :interval->task->metric-registry (HashMap.)
  :task->component (:task->component worker)
+ :task-metrics (TaskMetrics/taskMetricsMap (first task-ids) (last 
task-ids) worker-context component-id)
--- End diff --

@HeartSaVioR Nice, thanks!


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161086016
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj ---
@@ -265,6 +265,7 @@
  :stats (mk-executor-stats <> (sampling-rate storm-conf))
  :interval->task->metric-registry (HashMap.)
  :task->component (:task->component worker)
+ :task-metrics (TaskMetrics/taskMetricsMap (first task-ids) (last 
task-ids) worker-context component-id)
--- End diff --

Never mind. Just fixed and pushed new commit.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r161080512
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj ---
@@ -265,6 +265,7 @@
  :stats (mk-executor-stats <> (sampling-rate storm-conf))
  :interval->task->metric-registry (HashMap.)
  :task->component (:task->component worker)
+ :task-metrics (TaskMetrics/taskMetricsMap (first task-ids) (last 
task-ids) worker-context component-id)
--- End diff --

We can create TaskMetrics instance in task-data and even get rid of the map 
lookup. I guess TaskMetrics doesn't have any executor specific metrics.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-10 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160805811
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj ---
@@ -446,7 +451,7 @@
 (.ack spout msg-id)
 (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. 
msg-id task-id time-delta))
 (when time-delta
-  (stats/spout-acked-tuple! (:stats executor-data) (:stream 
tuple-info) time-delta
+  (stats/spout-acked-tuple! (:stats executor-data) 
(StormMetricRegistry/counter "acked" (:worker-context executor-data) 
(:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream 
tuple-info)) (:stream tuple-info) time-delta
--- End diff --

That makes a lot more sense now. Thanks for clarifying.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-10 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160801985
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj ---
@@ -446,7 +451,7 @@
 (.ack spout msg-id)
 (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. 
msg-id task-id time-delta))
 (when time-delta
-  (stats/spout-acked-tuple! (:stats executor-data) (:stream 
tuple-info) time-delta
+  (stats/spout-acked-tuple! (:stats executor-data) 
(StormMetricRegistry/counter "acked" (:worker-context executor-data) 
(:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream 
tuple-info)) (:stream tuple-info) time-delta
--- End diff --

I was thinking that the map would have just the stream id as the key, but 
the name of the metric could be anything.  i.e.

```
public class TaskStats {
   ConcurrentMap ackByStream = ...;
   TaskStats(String topoId, String componentId, int taskid, int workerPort) 
{
///Save these away...
   }

   public Counter getAck(String streamId) {
  Counter c = ackByStream.get(streamId);
   if (c == null) {
   c = StormMetricRegistry.counter("acked", topoId, componentId, 
taskId, streamId);
   ackByStream.put(streamId, c);
   }
   return c;
   }
}
```

We could then have a map of TaskStats in the executor-data, one for each 
executorId the executor is over.  Or like I said if you want to go crazy you 
could take the list of known streams (inbound and outbound depending on the 
stat) and populate the metrics in the constructor, but it is not necessary. 


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-10 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160777488
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj ---
@@ -446,7 +451,7 @@
 (.ack spout msg-id)
 (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. 
msg-id task-id time-delta))
 (when time-delta
-  (stats/spout-acked-tuple! (:stats executor-data) (:stream 
tuple-info) time-delta
+  (stats/spout-acked-tuple! (:stats executor-data) 
(StormMetricRegistry/counter "acked" (:worker-context executor-data) 
(:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream 
tuple-info)) (:stream tuple-info) time-delta
--- End diff --

@revans2 regarding map lookup, wouldn't we have to concatenate in order to 
create the lookup key?


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160544887
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj ---
@@ -446,7 +451,7 @@
 (.ack spout msg-id)
 (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. 
msg-id task-id time-delta))
 (when time-delta
-  (stats/spout-acked-tuple! (:stats executor-data) (:stream 
tuple-info) time-delta
+  (stats/spout-acked-tuple! (:stats executor-data) 
(StormMetricRegistry/counter "acked" (:worker-context executor-data) 
(:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream 
tuple-info)) (:stream tuple-info) time-delta
--- End diff --

I think we can turn it into just a map lookup, although I don't know how 
much work that will be.

We should know the stream names ahead of time, from the StormTopology.  As 
such we can create all of the metrics ahead of time for each bolt/spout 
instance, and cache them in a map keyed by the stream id.  Then when a tuple is 
Emitted/Acked/etc. we look up the Counter based off of the stream id and use 
it.  No object allocation and no string concatenation involved.

If we don't want to bother with parsing the StormTopology and prepopulating 
it, we could do lazy initialization like we are doing now.  


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160513274
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj ---
@@ -446,7 +451,7 @@
 (.ack spout msg-id)
 (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. 
msg-id task-id time-delta))
 (when time-delta
-  (stats/spout-acked-tuple! (:stats executor-data) (:stream 
tuple-info) time-delta
+  (stats/spout-acked-tuple! (:stats executor-data) 
(StormMetricRegistry/counter "acked" (:worker-context executor-data) 
(:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream 
tuple-info)) (:stream tuple-info) time-delta
--- End diff --

It's not ideal. Here's some background from earlier review:

@HeartSaVioR:
>> Looks like we compose metric name and lookup from REGISTRY every time, 
even without executor ID and stream ID. I can see more calculation should be 
done after addressing, but not sure how much it affects performance. If we 
could also memorize metric name per combination of parameters it might help, 
but I'm also not sure how much it will help.

@ptgoetz:
>Prior to adding stream ID, we could store the metric as a variable and 
reuse it without having to do a lookup on each use. Adding stream ID required 
(unless I'm missing something) doing the lookup on every use. There might be 
additional optimizations, but because metrics names are composed of several 
fields, some level of string concatenation is unavoidable. For example, we 
could try to optimize lookups by caching metrics with metric name as the key, 
but we would still have to do concatenation to create the lookup key.

>I suppose we could create a MetricName class to serve as the cache key, 
but we'd be trading string concatenation for object creation (unless we name 
MetricName mutable with setX methods).


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160509152
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/nimbus/DefaultTopologyValidator.java ---
@@ -17,15 +17,49 @@
  */
 package org.apache.storm.nimbus;
 
+import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Map;
 
 public class DefaultTopologyValidator implements ITopologyValidator {
+private static final Logger LOG = 
LoggerFactory.getLogger(DefaultTopologyValidator.class);
 @Override
 public void prepare(Map StormConf){
 }
 @Override
-public void validate(String topologyName, Map topologyConf, 
StormTopology topology) throws InvalidTopologyException {
+public void validate(String topologyName, Map topologyConf, 
StormTopology topology) throws InvalidTopologyException {
+if(topologyName.contains(".")){
+LOG.warn("Metrics for topology name '{}' will be reported as 
'{}'.", topologyName, topologyName.replace('.', '_') );
+}
+Map spouts = topology.get_spouts();
+for(String spoutName : spouts.keySet()){
+if(spoutName.contains(".")){
+LOG.warn("Metrics for spout name '{}' will be reported as 
'{}'.", spoutName, spoutName.replace('.', '_') );
+}
+SpoutSpec spoutSpec = spouts.get(spoutName);
+for(String streamName : 
spoutSpec.get_common().get_streams().keySet()){
+if(streamName.contains(".")){
+LOG.warn("Metrics for stream name '{}' will be 
reported as '{}'.", streamName, streamName.replace('.', '_') );
+}
+}
+}
+
+Map bolts = topology.get_bolts();
+for(String boltName : bolts.keySet()){
+if(boltName.contains(".")){
+LOG.warn("Metrics for bolt name '{}' will be reported as 
'{}'.", boltName, boltName.replace('.', '_') );
+}
+Bolt bolt = bolts.get(boltName);
+for(String streamName : 
bolt.get_common().get_streams().keySet()){
+if(streamName.contains(".")){
+LOG.warn("Metrics for stream name '{}' will be 
reported as '{}'.", streamName, streamName.replace('.', '_') );
+}
+}
+}
--- End diff --

I'll give you that it is better than nothing.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160508705
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/nimbus/DefaultTopologyValidator.java ---
@@ -17,15 +17,49 @@
  */
 package org.apache.storm.nimbus;
 
+import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Map;
 
 public class DefaultTopologyValidator implements ITopologyValidator {
+private static final Logger LOG = 
LoggerFactory.getLogger(DefaultTopologyValidator.class);
 @Override
 public void prepare(Map StormConf){
 }
 @Override
-public void validate(String topologyName, Map topologyConf, 
StormTopology topology) throws InvalidTopologyException {
+public void validate(String topologyName, Map topologyConf, 
StormTopology topology) throws InvalidTopologyException {
+if(topologyName.contains(".")){
+LOG.warn("Metrics for topology name '{}' will be reported as 
'{}'.", topologyName, topologyName.replace('.', '_') );
+}
+Map spouts = topology.get_spouts();
+for(String spoutName : spouts.keySet()){
+if(spoutName.contains(".")){
+LOG.warn("Metrics for spout name '{}' will be reported as 
'{}'.", spoutName, spoutName.replace('.', '_') );
+}
+SpoutSpec spoutSpec = spouts.get(spoutName);
+for(String streamName : 
spoutSpec.get_common().get_streams().keySet()){
+if(streamName.contains(".")){
+LOG.warn("Metrics for stream name '{}' will be 
reported as '{}'.", streamName, streamName.replace('.', '_') );
+}
+}
+}
+
+Map bolts = topology.get_bolts();
+for(String boltName : bolts.keySet()){
+if(boltName.contains(".")){
+LOG.warn("Metrics for bolt name '{}' will be reported as 
'{}'.", boltName, boltName.replace('.', '_') );
+}
+Bolt bolt = bolts.get(boltName);
+for(String streamName : 
bolt.get_common().get_streams().keySet()){
+if(streamName.contains(".")){
+LOG.warn("Metrics for stream name '{}' will be 
reported as '{}'.", streamName, streamName.replace('.', '_') );
+}
+}
+}
--- End diff --

True. The user will have to hunt, but I suppose it's better than silence.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160506051
  
--- Diff: storm-core/src/jvm/org/apache/storm/task/TopologyContext.java ---
@@ -386,4 +388,28 @@ public ReducedMetric registerMetric(String name, 
IReducer reducer, int timeBucke
 public CombinedMetric registerMetric(String name, ICombiner combiner, 
int timeBucketSizeInSecs) {
 return registerMetric(name, new CombinedMetric(combiner), 
timeBucketSizeInSecs);
 }
+
+public Timer registerTimer(String name){
+return StormMetricRegistry.registry().timer(metricName(name));
+}
+
+public Histogram registerHistogram(String name){
+return StormMetricRegistry.registry().histogram(metricName(name));
+}
+
+public Meter registerMeter(String name){
+return StormMetricRegistry.registry().meter(metricName(name));
+}
+
+public Counter registerCounter(String name){
+return StormMetricRegistry.registry().counter(metricName(name));
+}
+
+public Gauge registerGauge(String name, Gauge gauge){
+return StormMetricRegistry.registry().register(metricName(name), 
gauge);
+}
+
+private String metricName(String name){
+return String.format("storm.topology.%s.%s.%s-%s", getStormId(), 
getThisComponentId(), getThisWorkerPort(), name);
--- End diff --

Yes. Good catch.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160505473
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, String executorId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,executorId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, String executorId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,executorId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+String localHost = "localhost";
+try {
+hostName = dotToUnderScore(Utils.localHostname());
+} catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname will be reported" +
+ " as 'localhost'.");
+}
+
+LOG.info("Starting metrics reporters...");
+List> 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160496988
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj ---
@@ -446,7 +451,7 @@
 (.ack spout msg-id)
 (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. 
msg-id task-id time-delta))
 (when time-delta
-  (stats/spout-acked-tuple! (:stats executor-data) (:stream 
tuple-info) time-delta
+  (stats/spout-acked-tuple! (:stats executor-data) 
(StormMetricRegistry/counter "acked" (:worker-context executor-data) 
(:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream 
tuple-info)) (:stream tuple-info) time-delta
--- End diff --

nit: how expensive is it to get the counter?

Building up the metric name does not sound ideal, and this is on the 
critical path.
 
This goes for a lot of places.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160499046
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId){
+String metricName = metricName(name, context.getStormId(), 
componentId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+String localHost = 
(String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME);
+if(localHost != null){
+hostName = localHost;
+} else {
+try {
+hostName = 
InetAddress.getLocalHost().getCanonicalHostName();
+} catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname ill be reported" +
+ " as 'localhost'.");
+}
+}
+
+LOG.info("Starting metrics reporters...");
+List> reporterList = (List>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+if(reporterList != null && reporterList.size() > 0) {
+for (Map reporterConfig : reporterList) {
+// only start those requested
+ 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160499205
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, String executorId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,executorId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, String executorId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,executorId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+String localHost = "localhost";
+try {
+hostName = dotToUnderScore(Utils.localHostname());
+} catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname will be reported" +
+ " as 'localhost'.");
+}
+
+LOG.info("Starting metrics reporters...");
+List> 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160498091
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
--- End diff --

nit: shouldn't this move down to line 54? where it is actually used?  Or 
perhaps it can be inlined.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160501227
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/nimbus/DefaultTopologyValidator.java ---
@@ -17,15 +17,49 @@
  */
 package org.apache.storm.nimbus;
 
+import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Map;
 
 public class DefaultTopologyValidator implements ITopologyValidator {
+private static final Logger LOG = 
LoggerFactory.getLogger(DefaultTopologyValidator.class);
 @Override
 public void prepare(Map StormConf){
 }
 @Override
-public void validate(String topologyName, Map topologyConf, 
StormTopology topology) throws InvalidTopologyException {
+public void validate(String topologyName, Map topologyConf, 
StormTopology topology) throws InvalidTopologyException {
+if(topologyName.contains(".")){
+LOG.warn("Metrics for topology name '{}' will be reported as 
'{}'.", topologyName, topologyName.replace('.', '_') );
+}
+Map spouts = topology.get_spouts();
+for(String spoutName : spouts.keySet()){
+if(spoutName.contains(".")){
+LOG.warn("Metrics for spout name '{}' will be reported as 
'{}'.", spoutName, spoutName.replace('.', '_') );
+}
+SpoutSpec spoutSpec = spouts.get(spoutName);
+for(String streamName : 
spoutSpec.get_common().get_streams().keySet()){
+if(streamName.contains(".")){
+LOG.warn("Metrics for stream name '{}' will be 
reported as '{}'.", streamName, streamName.replace('.', '_') );
+}
+}
+}
+
+Map bolts = topology.get_bolts();
+for(String boltName : bolts.keySet()){
+if(boltName.contains(".")){
+LOG.warn("Metrics for bolt name '{}' will be reported as 
'{}'.", boltName, boltName.replace('.', '_') );
+}
+Bolt bolt = bolts.get(boltName);
+for(String streamName : 
bolt.get_common().get_streams().keySet()){
+if(streamName.contains(".")){
+LOG.warn("Metrics for stream name '{}' will be 
reported as '{}'.", streamName, streamName.replace('.', '_') );
+}
+}
+}
--- End diff --

This is fine, but kind of useless.  The warnings are going to show up in 
the number log, so the end user never sees them.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160498347
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId, String executorId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,executorId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId, String executorId, String streamId){
+String metricName = metricName(name, context.getStormId(), 
componentId, streamId,executorId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+String localHost = "localhost";
--- End diff --

I think this line is unused, or perhaps you actually wanted to default the 
hostName to "localhost" but ended up declaring a variable instead.
  


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160501820
  
--- Diff: storm-core/src/jvm/org/apache/storm/task/TopologyContext.java ---
@@ -386,4 +388,28 @@ public ReducedMetric registerMetric(String name, 
IReducer reducer, int timeBucke
 public CombinedMetric registerMetric(String name, ICombiner combiner, 
int timeBucketSizeInSecs) {
 return registerMetric(name, new CombinedMetric(combiner), 
timeBucketSizeInSecs);
 }
+
+public Timer registerTimer(String name){
+return StormMetricRegistry.registry().timer(metricName(name));
+}
+
+public Histogram registerHistogram(String name){
+return StormMetricRegistry.registry().histogram(metricName(name));
+}
+
+public Meter registerMeter(String name){
+return StormMetricRegistry.registry().meter(metricName(name));
+}
+
+public Counter registerCounter(String name){
+return StormMetricRegistry.registry().counter(metricName(name));
+}
+
+public Gauge registerGauge(String name, Gauge gauge){
+return StormMetricRegistry.registry().register(metricName(name), 
gauge);
+}
+
+private String metricName(String name){
+return String.format("storm.topology.%s.%s.%s-%s", getStormId(), 
getThisComponentId(), getThisWorkerPort(), name);
--- End diff --

Don't we need to replace the "."'s in getThidComponentId() with '_'?


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-09 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r160499395
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId){
+String metricName = metricName(name, context.getStormId(), 
componentId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+String localHost = 
(String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME);
+if(localHost != null){
+hostName = localHost;
+} else {
+try {
+hostName = 
InetAddress.getLocalHost().getCanonicalHostName();
+} catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname ill be reported" +
+ " as 'localhost'.");
+}
+}
+
+LOG.info("Starting metrics reporters...");
+List> reporterList = (List>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+if(reporterList != null && reporterList.size() > 0) {
+for (Map reporterConfig : reporterList) {
+// only start those requested
+ 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-03 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r159519245
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---
@@ -65,7 +66,7 @@
 private static final Object INTERRUPT = new Object();
 private static final String PREFIX = "disruptor-";
 private static final FlusherPool FLUSHER = new FlusherPool();
-private static final Timer METRICS_TIMER = new 
Timer("disruptor-metrics-timer", true);
+private static final ScheduledThreadPoolExecutor 
METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1);
--- End diff --

You were right. Fixed.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-03 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r159515622
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---
@@ -65,7 +66,7 @@
 private static final Object INTERRUPT = new Object();
 private static final String PREFIX = "disruptor-";
 private static final FlusherPool FLUSHER = new FlusherPool();
-private static final Timer METRICS_TIMER = new 
Timer("disruptor-metrics-timer", true);
+private static final ScheduledThreadPoolExecutor 
METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1);
--- End diff --

Good catch. I'll check and update as necessary.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2018-01-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r159514108
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---
@@ -65,7 +66,7 @@
 private static final Object INTERRUPT = new Object();
 private static final String PREFIX = "disruptor-";
 private static final FlusherPool FLUSHER = new FlusherPool();
-private static final Timer METRICS_TIMER = new 
Timer("disruptor-metrics-timer", true);
+private static final ScheduledThreadPoolExecutor 
METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1);
--- End diff --

Thanks for replacing the Timer. If the threads for this still need to be 
daemon threads, you should use the constructor that takes a ThreadFactory, I 
believe the default factory produces non-daemon threads.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-22 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r158555456
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---
@@ -418,12 +430,24 @@ public DisruptorQueue(String queueName, ProducerType 
type, int size, long readTi
 _barrier = _buffer.newBarrier();
 _buffer.addGatingSequences(_consumer);
 _metrics = new QueueMetrics();
+_disruptorMetrics = 
StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, port);
 //The batch size can be no larger than half the full queue size.
 //This is mostly to avoid contention issues.
 _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2));
 
 _flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
 _flusher.start();
+try {
+METRICS_TIMER.schedule(new TimerTask() {
+@Override
+public void run() {
+_disruptorMetrics.set(_metrics);
+}
+}, 15000, 15000);
+} catch (IllegalStateException e){
--- End diff --

Thanks, but this seems like it could hide errors by accident. If we replace 
the Timer with a 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html
 we can check if it is shut down instead of catching this exception, and do a 
debug or trace log if it is, so it's obvious why scheduling isn't happening in 
case someone has to debug this later.

I don't know why it isn't being shown in the current diff, but there's also 
this suggestion from earlier that doesn't appear to have a response: 
https://github.com/apache/storm/pull/2203/files/00a382b017c1e29863ac4d9a4449086ef79384e4#r128586571


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-22 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r158554446
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java ---
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+
+import java.util.Map;
+
+public interface StormReporter extends Reporter {
--- End diff --

Thanks. I didn't realize that `ScheduledReporter.stop` and 
`ScheduledReporter.close` seem to do the same thing. I thought `stop` was more 
of a pause-so-we-can-resume-later thing.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-22 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r158549262
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+protected long reportingPeriod;
+protected TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
+LOG.debug("Starting...");
+reporter.start(reportingPeriod, reportingPeriodUnit);
+} else {
+throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+}
+}
+
+@Override
+public void stop() {
+if (reporter != null) {
+LOG.debug("Stopping...");
+reporter.stop();
+} else {
+throw new IllegalStateException("Attempt to stop without 
preparing " + getClass().getSimpleName());
+}
+}
+
+
+public static TimeUnit getReportPeriodUnit(Map 
reporterConf) {
+TimeUnit unit = getTimeUnitForConfig(reporterConf, 
REPORT_PERIOD_UNITS);
+return unit == null ? TimeUnit.SECONDS : unit;
+}
+
+private static TimeUnit getTimeUnitForConfig(Map reporterConf, String 
configName) {
+String rateUnitString = 
Utils.getString(reporterConf.get(configName), null);
+if (rateUnitString != null) {
+return TimeUnit.valueOf(rateUnitString);
+}
+return null;
+}
+
+public static long getReportPeriod(Map reporterConf) {
+return Utils.getInt(reporterConf.get(REPORT_PERIOD), 
10).longValue();
+}
+
+public static StormMetricsFilter getMetricsFilter(Map reporterConf){
+StormMetricsFilter filter = null;
+Map filterConf = (Map)reporterConf.get("filter");
+if(filterConf != null) {
+String clazz = (String) filterConf.get("class");
+if (clazz != null) {
+try {
+filter = (StormMetricsFilter) 
Metrics2Utils.instantiate(clazz);
+filter.prepare(filterConf);
+} catch (Exception e) {
+LOG.warn("Unable to instantiate StormMetricsFilter 
class: {}", clazz);
--- End diff --

Okay. I changed it to crash instead.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-22 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r158549210
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
@@ -493,6 +493,46 @@ public void validateField(String name, Object o) {
 }
 }
 
+public static class MetricReportersValidator extends Validator {
+
+@Override
+public void validateField(String name, Object o) {
+if(o == null) {
+return;
+}
+SimpleTypeValidator.validateField(name, Map.class, o);
+if(!((Map) o).containsKey("class") ) {
--- End diff --

Done.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-22 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r158549192
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---
@@ -418,12 +430,19 @@ public DisruptorQueue(String queueName, ProducerType 
type, int size, long readTi
 _barrier = _buffer.newBarrier();
 _buffer.addGatingSequences(_consumer);
 _metrics = new QueueMetrics();
+_disruptorMetrics = 
StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, port);
 //The batch size can be no larger than half the full queue size.
 //This is mostly to avoid contention issues.
 _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2));
 
 _flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
 _flusher.start();
+METRICS_TIMER.schedule(new TimerTask(){
--- End diff --

Addressed in latest commit.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-22 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r158545079
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java ---
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+
+import java.util.Map;
+
+public interface StormReporter extends Reporter {
--- End diff --

If I'm interpreting your question correctly, that's what the `stop()` 
method is for.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-22 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r158539020
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+protected long reportingPeriod;
+protected TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
+LOG.debug("Starting...");
+reporter.start(reportingPeriod, reportingPeriodUnit);
+} else {
+throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+}
+}
+
+@Override
+public void stop() {
+if (reporter != null) {
+LOG.debug("Stopping...");
+reporter.stop();
+} else {
+throw new IllegalStateException("Attempt to stop without 
preparing " + getClass().getSimpleName());
+}
+}
+
+
+public static TimeUnit getReportPeriodUnit(Map 
reporterConf) {
+TimeUnit unit = getTimeUnitForConfig(reporterConf, 
REPORT_PERIOD_UNITS);
+return unit == null ? TimeUnit.SECONDS : unit;
+}
+
+private static TimeUnit getTimeUnitForConfig(Map reporterConf, String 
configName) {
+String rateUnitString = 
Utils.getString(reporterConf.get(configName), null);
+if (rateUnitString != null) {
+return TimeUnit.valueOf(rateUnitString);
+}
+return null;
+}
+
+public static long getReportPeriod(Map reporterConf) {
+return Utils.getInt(reporterConf.get(REPORT_PERIOD), 
10).longValue();
+}
+
+public static StormMetricsFilter getMetricsFilter(Map reporterConf){
+StormMetricsFilter filter = null;
+Map filterConf = (Map)reporterConf.get("filter");
+if(filterConf != null) {
+String clazz = (String) filterConf.get("class");
+if (clazz != null) {
+try {
+filter = (StormMetricsFilter) 
Metrics2Utils.instantiate(clazz);
+filter.prepare(filterConf);
+} catch (Exception e) {
+LOG.warn("Unable to instantiate StormMetricsFilter 
class: {}", clazz);
--- End diff --

Sorry, forgot to respond to this. Don't we usually crash if Storm is 
misconfigured in some way? I'm worried it will be less visible to users if we 
quietly(-ish, there's a warning in the log) do the wrong thing, rather than 
crashing.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-22 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r158538088
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+protected long reportingPeriod;
+protected TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
--- End diff --

Makes sense, thanks.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-22 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r158538124
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+protected long reportingPeriod;
+protected TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
+LOG.debug("Starting...");
+reporter.start(reportingPeriod, reportingPeriodUnit);
+} else {
+throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+}
+}
+
+@Override
+public void stop() {
+if (reporter != null) {
+LOG.debug("Stopping...");
+reporter.stop();
+} else {
+throw new IllegalStateException("Attempt to stop without 
preparing " + getClass().getSimpleName());
+}
+}
+
+
+public static TimeUnit getReportPeriodUnit(Map 
reporterConf) {
+TimeUnit unit = getTimeUnitForConfig(reporterConf, 
REPORT_PERIOD_UNITS);
+return unit == null ? TimeUnit.SECONDS : unit;
--- End diff --

Thanks for explaining.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-08 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155864521
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+protected long reportingPeriod;
+protected TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
+LOG.debug("Starting...");
+reporter.start(reportingPeriod, reportingPeriodUnit);
+} else {
+throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+}
+}
+
+@Override
+public void stop() {
+if (reporter != null) {
+LOG.debug("Stopping...");
+reporter.stop();
+} else {
+throw new IllegalStateException("Attempt to stop without 
preparing " + getClass().getSimpleName());
+}
+}
+
+
+public static TimeUnit getReportPeriodUnit(Map 
reporterConf) {
+TimeUnit unit = getTimeUnitForConfig(reporterConf, 
REPORT_PERIOD_UNITS);
+return unit == null ? TimeUnit.SECONDS : unit;
+}
+
+private static TimeUnit getTimeUnitForConfig(Map reporterConf, String 
configName) {
+String rateUnitString = 
Utils.getString(reporterConf.get(configName), null);
+if (rateUnitString != null) {
+return TimeUnit.valueOf(rateUnitString);
+}
+return null;
+}
+
+public static long getReportPeriod(Map reporterConf) {
+return Utils.getInt(reporterConf.get(REPORT_PERIOD), 
10).longValue();
+}
+
+public static StormMetricsFilter getMetricsFilter(Map reporterConf){
+StormMetricsFilter filter = null;
+Map filterConf = (Map)reporterConf.get("filter");
+if(filterConf != null) {
+String clazz = (String) filterConf.get("class");
+if (clazz != null) {
+try {
+filter = (StormMetricsFilter) 
Metrics2Utils.instantiate(clazz);
+filter.prepare(filterConf);
+} catch (Exception e) {
+LOG.warn("Unable to instantiate StormMetricsFilter 
class: {}", clazz);
--- End diff --

I could go either way... But it didn't seem right to crash just because of 
a misconfigured filter.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-08 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155863202
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
@@ -493,6 +493,46 @@ public void validateField(String name, Object o) {
 }
 }
 
+public static class MetricReportersValidator extends Validator {
+
+@Override
+public void validateField(String name, Object o) {
+if(o == null) {
+return;
+}
+SimpleTypeValidator.validateField(name, Map.class, o);
+if(!((Map) o).containsKey("class") ) {
+throw new IllegalArgumentException( "Field " + name + " 
must have map entry with key: class");
+}
+if(!((Map) o).containsKey("daemons") ) {
+throw new IllegalArgumentException("Field " + name + " 
must have map entry with key: daemons");
+} else {
+// daemons can only be 'nimbus', 'supervisor', or 'worker'
+Object list = ((Map)o).get("daemons");
+if(list == null || !(list instanceof List)){
+throw new IllegalArgumentException("Field 'daemons' 
must be a non-null list.");
+}
+List daemonList = (List)list;
+for(Object string : daemonList){
+if (string instanceof String &&
+(((String) string).equals("nimbus") ||
+((String) string).equals("supervisor") 
||
+((String) string).equals("worker"))) {
+return;
+}
+throw new IllegalArgumentException("Field daemons must 
contain at least one of \"nimbus\", \"supervisor\", or \"worker\"");
+}
+
+}
+if(((Map)o).containsKey("filter")){
+Map filterMap = (Map)((Map)o).get("filter");
+SimpleTypeValidator.validateField("filter", String.class, 
filterMap.get("class"));
--- End diff --

Yes. Good catch.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-08 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155863192
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
@@ -493,6 +493,46 @@ public void validateField(String name, Object o) {
 }
 }
 
+public static class MetricReportersValidator extends Validator {
+
+@Override
+public void validateField(String name, Object o) {
+if(o == null) {
+return;
+}
+SimpleTypeValidator.validateField(name, Map.class, o);
+if(!((Map) o).containsKey("class") ) {
+throw new IllegalArgumentException( "Field " + name + " 
must have map entry with key: class");
+}
+if(!((Map) o).containsKey("daemons") ) {
+throw new IllegalArgumentException("Field " + name + " 
must have map entry with key: daemons");
+} else {
+// daemons can only be 'nimbus', 'supervisor', or 'worker'
+Object list = ((Map)o).get("daemons");
+if(list == null || !(list instanceof List)){
+throw new IllegalArgumentException("Field 'daemons' 
must be a non-null list.");
+}
+List daemonList = (List)list;
+for(Object string : daemonList){
+if (string instanceof String &&
+(((String) string).equals("nimbus") ||
+((String) string).equals("supervisor") 
||
+((String) string).equals("worker"))) {
+return;
+}
+throw new IllegalArgumentException("Field daemons must 
contain at least one of \"nimbus\", \"supervisor\", or \"worker\"");
+}
+
+}
+if(((Map)o).containsKey("filter")){
+Map filterMap = (Map)((Map)o).get("filter");
+SimpleTypeValidator.validateField("filter", String.class, 
filterMap.get("class"));
+}
+SimpleTypeValidator.validateField(name, String.class, ((Map) 
o).get("class"));
--- End diff --

Not here. The `name` variable should contain "class" here.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-08 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155856433
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
@@ -493,6 +493,46 @@ public void validateField(String name, Object o) {
 }
 }
 
+public static class MetricReportersValidator extends Validator {
+
+@Override
+public void validateField(String name, Object o) {
+if(o == null) {
+return;
+}
+SimpleTypeValidator.validateField(name, Map.class, o);
+if(!((Map) o).containsKey("class") ) {
+throw new IllegalArgumentException( "Field " + name + " 
must have map entry with key: class");
+}
+if(!((Map) o).containsKey("daemons") ) {
+throw new IllegalArgumentException("Field " + name + " 
must have map entry with key: daemons");
+} else {
+// daemons can only be 'nimbus', 'supervisor', or 'worker'
+Object list = ((Map)o).get("daemons");
+if(list == null || !(list instanceof List)){
+throw new IllegalArgumentException("Field 'daemons' 
must be a non-null list.");
+}
+List daemonList = (List)list;
+for(Object string : daemonList){
+if (string instanceof String &&
+(((String) string).equals("nimbus") ||
+((String) string).equals("supervisor") 
||
+((String) string).equals("worker"))) {
+return;
--- End diff --

Good catch.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-08 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155855673
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+protected long reportingPeriod;
+protected TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
+LOG.debug("Starting...");
+reporter.start(reportingPeriod, reportingPeriodUnit);
+} else {
+throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+}
+}
+
+@Override
+public void stop() {
+if (reporter != null) {
+LOG.debug("Stopping...");
+reporter.stop();
+} else {
+throw new IllegalStateException("Attempt to stop without 
preparing " + getClass().getSimpleName());
+}
+}
+
+
+public static TimeUnit getReportPeriodUnit(Map 
reporterConf) {
+TimeUnit unit = getTimeUnitForConfig(reporterConf, 
REPORT_PERIOD_UNITS);
+return unit == null ? TimeUnit.SECONDS : unit;
--- End diff --

We can handle this in documentation. Documentation is pending the 
finalization of the approach to metrics naming, paths, metadata, etc.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-08 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155855300
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+protected long reportingPeriod;
+protected TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
--- End diff --

Yes, but it gives additional information about what went wrong as opposed 
to a "naked" NPE.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-08 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155854567
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
 ---
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ganglia.GangliaReporter;
+import com.codahale.metrics.MetricRegistry;
+import info.ganglia.gmetric4j.gmetric.GMetric;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class GangliaStormReporter extends ScheduledStormReporter {
+private final static Logger LOG = 
LoggerFactory.getLogger(GangliaStormReporter.class);
+
+public static final String GANGLIA_HOST = "ganglia.host";
+public static final String GANGLIA_PORT = "ganglia.port";
+public static final String GANGLIA_PREFIXED_WITH = 
"ganglia.prefixed.with";
+public static final String GANGLIA_DMAX = "ganglia.dmax";
+public static final String GANGLIA_TMAX = "ganglia.tmax";
+public static final String GANGLIA_UDP_ADDRESSING_MODE = 
"ganglia.udp.addressing.mode";
+public static final String GANGLIA_RATE_UNIT = "ganglia.rate.unit";
+public static final String GANGLIA_DURATION_UNIT = 
"ganglia.duration.unit";
+public static final String GANGLIA_TTL = "ganglia.ttl";
+public static final String GANGLIA_UDP_GROUP = "ganglia.udp.group";
+
+@Override
+public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map 
reporterConf) {
+LOG.debug("Preparing...");
+GangliaReporter.Builder builder = 
GangliaReporter.forRegistry(metricsRegistry);
+
+TimeUnit durationUnit = 
MetricsUtils.getMetricsDurationUnit(reporterConf);
+if (durationUnit != null) {
+builder.convertDurationsTo(durationUnit);
+}
+
+TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+if (rateUnit != null) {
+builder.convertRatesTo(rateUnit);
+}
+
+StormMetricsFilter filter = getMetricsFilter(reporterConf);
+if(filter != null){
+builder.filter(filter);
+}
+String prefix = getMetricsPrefixedWith(reporterConf);
+if (prefix != null) {
+builder.prefixedWith(prefix);
+}
+
+Integer dmax = getGangliaDMax(reporterConf);
+if (prefix != null) {
+builder.withDMax(dmax);
+}
+
+Integer tmax = getGangliaTMax(reporterConf);
+if (prefix != null) {
+builder.withTMax(tmax);
+}
+
+//defaults to 10
+reportingPeriod = getReportPeriod(reporterConf);
+
+//defaults to seconds
+reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+// Not exposed:
--- End diff --

It means the `GangliaReporter.Builder.withClock` method is not 
exposed/setable by the storm configuration. I'll remove it.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155645499
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+protected long reportingPeriod;
+protected TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
+LOG.debug("Starting...");
+reporter.start(reportingPeriod, reportingPeriodUnit);
+} else {
+throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+}
+}
+
+@Override
+public void stop() {
+if (reporter != null) {
+LOG.debug("Stopping...");
+reporter.stop();
+} else {
+throw new IllegalStateException("Attempt to stop without 
preparing " + getClass().getSimpleName());
+}
+}
+
+
+public static TimeUnit getReportPeriodUnit(Map 
reporterConf) {
+TimeUnit unit = getTimeUnitForConfig(reporterConf, 
REPORT_PERIOD_UNITS);
+return unit == null ? TimeUnit.SECONDS : unit;
+}
+
+private static TimeUnit getTimeUnitForConfig(Map reporterConf, String 
configName) {
+String rateUnitString = 
Utils.getString(reporterConf.get(configName), null);
+if (rateUnitString != null) {
+return TimeUnit.valueOf(rateUnitString);
+}
+return null;
+}
+
+public static long getReportPeriod(Map reporterConf) {
+return Utils.getInt(reporterConf.get(REPORT_PERIOD), 
10).longValue();
+}
+
+public static StormMetricsFilter getMetricsFilter(Map reporterConf){
+StormMetricsFilter filter = null;
+Map filterConf = (Map)reporterConf.get("filter");
+if(filterConf != null) {
+String clazz = (String) filterConf.get("class");
+if (clazz != null) {
+try {
+filter = (StormMetricsFilter) 
Metrics2Utils.instantiate(clazz);
+filter.prepare(filterConf);
+} catch (Exception e) {
+LOG.warn("Unable to instantiate StormMetricsFilter 
class: {}", clazz);
--- End diff --

Same question as the other use of Metrics2Utils, why don't we want to crash?


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155645156
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId){
+String metricName = metricName(name, context.getStormId(), 
componentId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+String localHost = 
(String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME);
+if(localHost != null){
+hostName = localHost;
+} else {
+try {
+hostName = 
InetAddress.getLocalHost().getCanonicalHostName();
+} catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname ill be reported" +
+ " as 'localhost'.");
+}
+}
+
+LOG.info("Starting metrics reporters...");
+List> reporterList = (List>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+if(reporterList != null && reporterList.size() > 0) {
+for (Map reporterConfig : reporterList) {
+// only start those requested
+

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155639127
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+protected long reportingPeriod;
+protected TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
+LOG.debug("Starting...");
+reporter.start(reportingPeriod, reportingPeriodUnit);
+} else {
+throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+}
+}
+
+@Override
+public void stop() {
+if (reporter != null) {
+LOG.debug("Stopping...");
+reporter.stop();
+} else {
+throw new IllegalStateException("Attempt to stop without 
preparing " + getClass().getSimpleName());
+}
+}
+
+
+public static TimeUnit getReportPeriodUnit(Map 
reporterConf) {
+TimeUnit unit = getTimeUnitForConfig(reporterConf, 
REPORT_PERIOD_UNITS);
+return unit == null ? TimeUnit.SECONDS : unit;
+}
+
+private static TimeUnit getTimeUnitForConfig(Map reporterConf, String 
configName) {
+String rateUnitString = 
Utils.getString(reporterConf.get(configName), null);
+if (rateUnitString != null) {
+return TimeUnit.valueOf(rateUnitString);
+}
+return null;
+}
+
+public static long getReportPeriod(Map reporterConf) {
+return Utils.getInt(reporterConf.get(REPORT_PERIOD), 
10).longValue();
--- End diff --

Same comment about noting the default somewhere visible to users.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155636033
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
 ---
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ganglia.GangliaReporter;
+import com.codahale.metrics.MetricRegistry;
+import info.ganglia.gmetric4j.gmetric.GMetric;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class GangliaStormReporter extends ScheduledStormReporter {
+private final static Logger LOG = 
LoggerFactory.getLogger(GangliaStormReporter.class);
+
+public static final String GANGLIA_HOST = "ganglia.host";
+public static final String GANGLIA_PORT = "ganglia.port";
+public static final String GANGLIA_PREFIXED_WITH = 
"ganglia.prefixed.with";
+public static final String GANGLIA_DMAX = "ganglia.dmax";
+public static final String GANGLIA_TMAX = "ganglia.tmax";
+public static final String GANGLIA_UDP_ADDRESSING_MODE = 
"ganglia.udp.addressing.mode";
+public static final String GANGLIA_RATE_UNIT = "ganglia.rate.unit";
+public static final String GANGLIA_DURATION_UNIT = 
"ganglia.duration.unit";
+public static final String GANGLIA_TTL = "ganglia.ttl";
+public static final String GANGLIA_UDP_GROUP = "ganglia.udp.group";
+
+@Override
+public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map 
reporterConf) {
+LOG.debug("Preparing...");
+GangliaReporter.Builder builder = 
GangliaReporter.forRegistry(metricsRegistry);
+
+TimeUnit durationUnit = 
MetricsUtils.getMetricsDurationUnit(reporterConf);
+if (durationUnit != null) {
+builder.convertDurationsTo(durationUnit);
+}
+
+TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+if (rateUnit != null) {
+builder.convertRatesTo(rateUnit);
+}
+
+StormMetricsFilter filter = getMetricsFilter(reporterConf);
+if(filter != null){
+builder.filter(filter);
+}
+String prefix = getMetricsPrefixedWith(reporterConf);
+if (prefix != null) {
+builder.prefixedWith(prefix);
+}
+
+Integer dmax = getGangliaDMax(reporterConf);
+if (prefix != null) {
+builder.withDMax(dmax);
+}
+
+Integer tmax = getGangliaTMax(reporterConf);
+if (prefix != null) {
+builder.withTMax(tmax);
+}
+
+//defaults to 10
+reportingPeriod = getReportPeriod(reporterConf);
+
+//defaults to seconds
+reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+// Not exposed:
--- End diff --

What does this comment mean?


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155642325
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---
@@ -418,12 +430,19 @@ public DisruptorQueue(String queueName, ProducerType 
type, int size, long readTi
 _barrier = _buffer.newBarrier();
 _buffer.addGatingSequences(_consumer);
 _metrics = new QueueMetrics();
+_disruptorMetrics = 
StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, port);
 //The batch size can be no larger than half the full queue size.
 //This is mostly to avoid contention issues.
 _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2));
 
 _flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
 _flusher.start();
+METRICS_TIMER.schedule(new TimerTask(){
--- End diff --

Won't this leak the task if the queue is shut down?


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155644347
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
@@ -493,6 +493,46 @@ public void validateField(String name, Object o) {
 }
 }
 
+public static class MetricReportersValidator extends Validator {
+
+@Override
+public void validateField(String name, Object o) {
+if(o == null) {
+return;
+}
+SimpleTypeValidator.validateField(name, Map.class, o);
+if(!((Map) o).containsKey("class") ) {
+throw new IllegalArgumentException( "Field " + name + " 
must have map entry with key: class");
+}
+if(!((Map) o).containsKey("daemons") ) {
+throw new IllegalArgumentException("Field " + name + " 
must have map entry with key: daemons");
+} else {
+// daemons can only be 'nimbus', 'supervisor', or 'worker'
+Object list = ((Map)o).get("daemons");
+if(list == null || !(list instanceof List)){
+throw new IllegalArgumentException("Field 'daemons' 
must be a non-null list.");
+}
+List daemonList = (List)list;
+for(Object string : daemonList){
+if (string instanceof String &&
+(((String) string).equals("nimbus") ||
+((String) string).equals("supervisor") 
||
+((String) string).equals("worker"))) {
+return;
+}
+throw new IllegalArgumentException("Field daemons must 
contain at least one of \"nimbus\", \"supervisor\", or \"worker\"");
+}
+
+}
+if(((Map)o).containsKey("filter")){
+Map filterMap = (Map)((Map)o).get("filter");
+SimpleTypeValidator.validateField("filter", String.class, 
filterMap.get("class"));
--- End diff --

Shouldn't the first parameter be "class"?


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155638805
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+protected long reportingPeriod;
+protected TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
+LOG.debug("Starting...");
+reporter.start(reportingPeriod, reportingPeriodUnit);
+} else {
+throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+}
+}
+
+@Override
+public void stop() {
+if (reporter != null) {
+LOG.debug("Stopping...");
+reporter.stop();
+} else {
+throw new IllegalStateException("Attempt to stop without 
preparing " + getClass().getSimpleName());
+}
+}
+
+
+public static TimeUnit getReportPeriodUnit(Map 
reporterConf) {
+TimeUnit unit = getTimeUnitForConfig(reporterConf, 
REPORT_PERIOD_UNITS);
+return unit == null ? TimeUnit.SECONDS : unit;
--- End diff --

Does it say anywhere in the documentation that we'll default to seconds? If 
not we should add a note. It would also be good to note somewhere that the 
units users can use are the ones defined by TimeUnit.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155637510
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java ---
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+
+import java.util.Map;
+
+public interface StormReporter extends Reporter {
--- End diff --

Most of the implementation start some kind of client in prepare. Shouldn't 
there be a close() method on this interface to allow for closing e.g. the 
reporters?


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155631053
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java ---
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import org.apache.storm.utils.DisruptorQueue;
+
+public class DisruptorMetrics {
+private SimpleGauge capacity;
--- End diff --

Nit: These seem like they can be final


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155644602
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
@@ -493,6 +493,46 @@ public void validateField(String name, Object o) {
 }
 }
 
+public static class MetricReportersValidator extends Validator {
+
+@Override
+public void validateField(String name, Object o) {
+if(o == null) {
+return;
+}
+SimpleTypeValidator.validateField(name, Map.class, o);
+if(!((Map) o).containsKey("class") ) {
+throw new IllegalArgumentException( "Field " + name + " 
must have map entry with key: class");
+}
+if(!((Map) o).containsKey("daemons") ) {
+throw new IllegalArgumentException("Field " + name + " 
must have map entry with key: daemons");
+} else {
+// daemons can only be 'nimbus', 'supervisor', or 'worker'
+Object list = ((Map)o).get("daemons");
+if(list == null || !(list instanceof List)){
+throw new IllegalArgumentException("Field 'daemons' 
must be a non-null list.");
+}
+List daemonList = (List)list;
+for(Object string : daemonList){
+if (string instanceof String &&
+(((String) string).equals("nimbus") ||
+((String) string).equals("supervisor") 
||
+((String) string).equals("worker"))) {
+return;
+}
+throw new IllegalArgumentException("Field daemons must 
contain at least one of \"nimbus\", \"supervisor\", or \"worker\"");
+}
+
+}
+if(((Map)o).containsKey("filter")){
+Map filterMap = (Map)((Map)o).get("filter");
+SimpleTypeValidator.validateField("filter", String.class, 
filterMap.get("class"));
+}
+SimpleTypeValidator.validateField(name, String.class, ((Map) 
o).get("class"));
--- End diff --

Same about the first parameter.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155640453
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java ---
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.filters;
+
+import com.codahale.metrics.MetricFilter;
+
+import java.util.Map;
+
+public interface StormMetricsFilter extends MetricFilter {
+
+/**
+ * Called after the filter is instantiated.
+ * @param config an arbitrary configuration map pulled from the yaml 
configuration.
--- End diff --

Nit: I don't feel like this comment really explain what'll be in the config 
map. How about "A map of the properties from the 'filter' section of the 
reporter configuration" or something like that?


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155638180
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+protected long reportingPeriod;
+protected TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
--- End diff --

Nit: Isn't this check just renaming an NPE? I'm wondering why it's needed.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155632044
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId){
+String metricName = metricName(name, context.getStormId(), 
componentId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static Counter counter(String name, WorkerTopologyContext 
context, String componentId){
+String metricName = metricName(name, context.getStormId(), 
componentId, context.getThisWorkerPort());
+return REGISTRY.counter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+String localHost = "localhost";
+try {
+hostName = Utils.localHostname();
+} catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname ill be reported" +
--- End diff --

Nit: ill -> will


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155640205
  
--- Diff: conf/defaults.yaml ---
@@ -293,3 +293,28 @@ storm.daemon.metrics.reporter.plugins:
 
 # configuration of cluster metrics consumer
 storm.cluster.metrics.consumer.publish.interval.secs: 60
+
+
+# Metrics v2 configuration (optional)
+#storm.metrics.reporters:
+#  # Graphite Reporter
+#  - class: "org.apache.storm.metrics2.reporters.GraphiteStormReporter"
+#daemons:
+#- "supervisor"
+#- "nimbus"
+#- "worker"
+#report.period: 60
+#report.period.units: "SECONDS"
+#graphite.host: "localhost"
+#graphite.port: 2003
+#
+#  # Console Reporter
+#  - class: "org.apache.storm.metrics2.reporters.ConsoleStormReporter"
+#daemons:
+#- "worker"
+#report.period: 10
+#report.period.units: "SECONDS"
+#
+#filter:
--- End diff --

Nit: Move this up a line. I wasn't sure this belonged to the console 
reporter at first.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155643706
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
@@ -493,6 +493,46 @@ public void validateField(String name, Object o) {
 }
 }
 
+public static class MetricReportersValidator extends Validator {
+
+@Override
+public void validateField(String name, Object o) {
+if(o == null) {
+return;
+}
+SimpleTypeValidator.validateField(name, Map.class, o);
+if(!((Map) o).containsKey("class") ) {
--- End diff --

Nit: Can we put some of these strings ('class', ''daemons' etc.) in 
constants somewhere? 


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155643146
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
@@ -493,6 +493,46 @@ public void validateField(String name, Object o) {
 }
 }
 
+public static class MetricReportersValidator extends Validator {
+
+@Override
+public void validateField(String name, Object o) {
+if(o == null) {
+return;
+}
+SimpleTypeValidator.validateField(name, Map.class, o);
+if(!((Map) o).containsKey("class") ) {
+throw new IllegalArgumentException( "Field " + name + " 
must have map entry with key: class");
+}
+if(!((Map) o).containsKey("daemons") ) {
+throw new IllegalArgumentException("Field " + name + " 
must have map entry with key: daemons");
+} else {
+// daemons can only be 'nimbus', 'supervisor', or 'worker'
+Object list = ((Map)o).get("daemons");
+if(list == null || !(list instanceof List)){
+throw new IllegalArgumentException("Field 'daemons' 
must be a non-null list.");
+}
+List daemonList = (List)list;
+for(Object string : daemonList){
+if (string instanceof String &&
+(((String) string).equals("nimbus") ||
+((String) string).equals("supervisor") 
||
+((String) string).equals("worker"))) {
+return;
+}
+throw new IllegalArgumentException("Field daemons must 
contain at least one of \"nimbus\", \"supervisor\", or \"worker\"");
--- End diff --

Message is misleading. It's printed if the list contains anything except 
the three allowed strings.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155642731
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
@@ -493,6 +493,46 @@ public void validateField(String name, Object o) {
 }
 }
 
+public static class MetricReportersValidator extends Validator {
+
+@Override
+public void validateField(String name, Object o) {
+if(o == null) {
+return;
+}
+SimpleTypeValidator.validateField(name, Map.class, o);
+if(!((Map) o).containsKey("class") ) {
+throw new IllegalArgumentException( "Field " + name + " 
must have map entry with key: class");
+}
+if(!((Map) o).containsKey("daemons") ) {
+throw new IllegalArgumentException("Field " + name + " 
must have map entry with key: daemons");
+} else {
+// daemons can only be 'nimbus', 'supervisor', or 'worker'
+Object list = ((Map)o).get("daemons");
+if(list == null || !(list instanceof List)){
--- End diff --

Nit: Redundant null check. null instanceof List is false.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155643436
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
@@ -493,6 +493,46 @@ public void validateField(String name, Object o) {
 }
 }
 
+public static class MetricReportersValidator extends Validator {
+
+@Override
+public void validateField(String name, Object o) {
+if(o == null) {
+return;
+}
+SimpleTypeValidator.validateField(name, Map.class, o);
+if(!((Map) o).containsKey("class") ) {
+throw new IllegalArgumentException( "Field " + name + " 
must have map entry with key: class");
+}
+if(!((Map) o).containsKey("daemons") ) {
+throw new IllegalArgumentException("Field " + name + " 
must have map entry with key: daemons");
+} else {
+// daemons can only be 'nimbus', 'supervisor', or 'worker'
+Object list = ((Map)o).get("daemons");
+if(list == null || !(list instanceof List)){
+throw new IllegalArgumentException("Field 'daemons' 
must be a non-null list.");
+}
+List daemonList = (List)list;
+for(Object string : daemonList){
+if (string instanceof String &&
+(((String) string).equals("nimbus") ||
+((String) string).equals("supervisor") 
||
+((String) string).equals("worker"))) {
+return;
--- End diff --

Is this return intentional? I think continue makes more sense, otherwise we 
skip the filter validation.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r155636648
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId){
+String metricName = metricName(name, context.getStormId(), 
componentId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+String localHost = 
(String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME);
+if(localHost != null){
+hostName = localHost;
+} else {
+try {
+hostName = 
InetAddress.getLocalHost().getCanonicalHostName();
+} catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname ill be reported" +
+ " as 'localhost'.");
+}
+}
+
+LOG.info("Starting metrics reporters...");
+List> reporterList = (List>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+for(Map reporterConfig : reporterList){
+// only start those requested
+List daemons = 
(List)reporterConfig.get("daemons");
+

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-29 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r141932720
  
--- Diff: storm-core/src/jvm/org/apache/storm/Config.java ---
@@ -139,6 +139,9 @@
 @isString
 public static final String STORM_META_SERIALIZATION_DELEGATE = 
"storm.meta.serialization.delegate";
 
+@isType(type=List.class)
--- End diff --

My question was mostly around can we make the config checks more specific 
instead of just a list of who knows what.

And at a minimum have some documentation about what should be stored in 
this config.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-29 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r141933736
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId){
+String metricName = metricName(name, context.getStormId(), 
componentId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+String localHost = 
(String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME);
+if(localHost != null){
+hostName = localHost;
+} else {
+try {
+hostName = 
InetAddress.getLocalHost().getCanonicalHostName();
+} catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname ill be reported" +
+ " as 'localhost'.");
+}
+}
+
+LOG.info("Starting metrics reporters...");
+List> reporterList = (List>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+for(Map reporterConfig : reporterList){
+// only start those requested
+List daemons = 
(List)reporterConfig.get("daemons");
+

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-29 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r141924200
  
--- Diff: storm-core/src/jvm/org/apache/storm/metrics2/Metrics2Utils.java 
---
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+public class Metrics2Utils {
--- End diff --

The `Utils.newInstance()` method throws `RuntimeException` if the class 
instantiation fails. I don't think we want that in this case.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-29 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r141922507
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId){
+String metricName = metricName(name, context.getStormId(), 
componentId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+String localHost = 
(String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME);
+if(localHost != null){
+hostName = localHost;
+} else {
+try {
+hostName = 
InetAddress.getLocalHost().getCanonicalHostName();
+} catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname ill be reported" +
+ " as 'localhost'.");
+}
+}
+
+LOG.info("Starting metrics reporters...");
+List> reporterList = (List>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+if(reporterList != null && reporterList.size() > 0) {
+for (Map reporterConfig : reporterList) {
+// only start those requested
+ 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r141137280
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
--- End diff --

Yes it is thread-safe. Will double-check on the multiple registrations and 
update as appropriate.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r141137596
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
--- End diff --

> Given that all the values are calculated from outside, isn't Gauge right 
on all fields?

Yes. I removed the comment and left it as a Guage. Not sure why that 
comment is still showing up in the GitHub diff.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r141131796
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java 
---
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+
+public class AnchoredWordCount {
+public static class RandomSentenceSpout extends BaseRichSpout {
+private static final Logger LOG = 
LoggerFactory.getLogger(RandomSentenceSpout.class);
+
+SpoutOutputCollector _collector;
+Random _rand;
--- End diff --

Absolutely (I'm personally not a big fan of the "_" convention). In general 
(outside the main branch) I try to follow the conventions that exist in the 
current source for consistency.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140987102
  
--- Diff: storm-core/src/jvm/org/apache/storm/task/TopologyContext.java ---
@@ -386,4 +388,28 @@ public ReducedMetric registerMetric(String name, 
IReducer reducer, int timeBucke
 public CombinedMetric registerMetric(String name, ICombiner combiner, 
int timeBucketSizeInSecs) {
--- End diff --

+1 Let's guide using new metrics if possible.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140971831
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
--- End diff --

nit: broken indentation, and style nit: space


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140985025
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter 
implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+long reportingPeriod;
--- End diff --

I would love to see reportingPeriod and reportingPeriodUnit `protected` 
given that these fields are used in derived classes.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140984424
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
 ---
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.graphite.GraphiteReporter;
+import com.codahale.metrics.graphite.GraphiteSender;
+import com.codahale.metrics.graphite.GraphiteUDP;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class GraphiteStormReporter extends 
ScheduledStormReporter {
+private final static Logger LOG = 
LoggerFactory.getLogger(GraphiteStormReporter.class);
+
+public static final String GRAPHITE_PREFIXED_WITH = 
"graphite.prefixed.with";
+public static final String GRAPHITE_HOST = "graphite.host";
+public static final String GRAPHITE_PORT = "graphite.port";
+public static final String GRAPHITE_TRANSPORT = "graphite.transport";
+
+@Override
+public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map 
reporterConf) {
+LOG.debug("Preparing...");
+GraphiteReporter.Builder builder = 
GraphiteReporter.forRegistry(metricsRegistry);
+
+TimeUnit durationUnit = 
MetricsUtils.getMetricsDurationUnit(reporterConf);
+if (durationUnit != null) {
+builder.convertDurationsTo(durationUnit);
+}
+
+TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+if (rateUnit != null) {
+builder.convertRatesTo(rateUnit);
+}
+
+StormMetricsFilter filter = getMetricsFilter(reporterConf);
+if(filter != null){
+builder.filter(filter);
+}
+String prefix = getMetricsPrefixedWith(reporterConf);
+if (prefix != null) {
+builder.prefixedWith(prefix);
+}
+
+//defaults to 10
+reportingPeriod = getReportPeriod(reporterConf);
+
+//defaults to seconds
+reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+// Not exposed:
+// * withClock(Clock)
+
+String host = getMetricsTargetHost(reporterConf);
+Integer port = getMetricsTargetPort(reporterConf);
+String transport = getMetricsTargetTransport(reporterConf);
+GraphiteSender sender = null;
+//TODO: error checking
--- End diff --

Let's either address or remove the line.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140985850
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter 
implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+long reportingPeriod;
+TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
+LOG.debug("Starting...");
+reporter.start(reportingPeriod, reportingPeriodUnit);
+} else {
+throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+}
+}
+
+@Override
+public void stop() {
+if (reporter != null) {
+LOG.debug("Stopping...");
+reporter.stop();
+} else {
+throw new IllegalStateException("Attempt to stop without 
preparing " + getClass().getSimpleName());
+}
+}
+
+
+static TimeUnit getReportPeriodUnit(Map reporterConf) {
+TimeUnit unit = getTimeUnitForConfig(reporterConf, 
REPORT_PERIOD_UNITS);
+return unit == null ? TimeUnit.SECONDS : unit;
+}
+
+private static TimeUnit getTimeUnitForConfig(Map reporterConf, String 
configName) {
+String rateUnitString = 
Utils.getString(reporterConf.get(configName), null);
+if (rateUnitString != null) {
+return TimeUnit.valueOf(rateUnitString);
+}
+return null;
+}
+
+static long getReportPeriod(Map reporterConf) {
+return Utils.getInt(reporterConf.get(REPORT_PERIOD), 
10).longValue();
+}
+
+static StormMetricsFilter getMetricsFilter(Map reporterConf){
--- End diff --

Same here too.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140985795
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter 
implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+long reportingPeriod;
+TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
+LOG.debug("Starting...");
+reporter.start(reportingPeriod, reportingPeriodUnit);
+} else {
+throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+}
+}
+
+@Override
+public void stop() {
+if (reporter != null) {
+LOG.debug("Stopping...");
+reporter.stop();
+} else {
+throw new IllegalStateException("Attempt to stop without 
preparing " + getClass().getSimpleName());
+}
+}
+
+
+static TimeUnit getReportPeriodUnit(Map reporterConf) {
+TimeUnit unit = getTimeUnitForConfig(reporterConf, 
REPORT_PERIOD_UNITS);
+return unit == null ? TimeUnit.SECONDS : unit;
+}
+
+private static TimeUnit getTimeUnitForConfig(Map reporterConf, String 
configName) {
+String rateUnitString = 
Utils.getString(reporterConf.get(configName), null);
+if (rateUnitString != null) {
+return TimeUnit.valueOf(rateUnitString);
+}
+return null;
+}
+
+static long getReportPeriod(Map reporterConf) {
--- End diff --

Same here.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140971387
  
--- Diff: storm-core/src/jvm/org/apache/storm/metrics2/Metrics2Utils.java 
---
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+public class Metrics2Utils {
--- End diff --

Maybe this class can be generalized. Btw doesn't Storm have such a method? 
It looks like used pattern.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140971053
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java ---
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import org.apache.storm.utils.DisruptorQueue;
+
+public class DisruptorMetrics {
+private SimpleGauge capacity;
+private SimpleGauge population;
+private SimpleGauge writePosition;
+private SimpleGauge readPosition;
+private SimpleGauge arrivalRate; // TODO: Change to meter
--- End diff --

Given that all the values are calculated from outside, isn't Gauge right on 
all fields?


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140972146
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
--- End diff --

Is it thread-safe? Given that metrics name doesn't have executor/task ID, 
same metrics name may be accessed with multiple threads.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140973295
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId){
+String metricName = metricName(name, context.getStormId(), 
componentId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+String localHost = 
(String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME);
--- End diff --

We can replace it with `Utils.hostname()`.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140984053
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
 ---
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ganglia.GangliaReporter;
+import com.codahale.metrics.MetricRegistry;
+import info.ganglia.gmetric4j.gmetric.GMetric;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class GangliaStormReporter extends 
ScheduledStormReporter {
+private final static Logger LOG = 
LoggerFactory.getLogger(GangliaStormReporter.class);
+
+public static final String GANGLIA_HOST = "ganglia.host";
+public static final String GANGLIA_PORT = "ganglia.port";
+public static final String GANGLIA_PREFIXED_WITH = 
"ganglia.prefixed.with";
+public static final String GANGLIA_DMAX = "ganglia.dmax";
+public static final String GANGLIA_TMAX = "ganglia.tmax";
+public static final String GANGLIA_UDP_ADDRESSING_MODE = 
"ganglia.udp.addressing.mode";
+public static final String GANGLIA_RATE_UNIT = "ganglia.rate.unit";
+public static final String GANGLIA_DURATION_UNIT = 
"ganglia.duration.unit";
+public static final String GANGLIA_TTL = "ganglia.ttl";
+public static final String GANGLIA_UDP_GROUP = "ganglia.udp.group";
+
+@Override
+public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map 
reporterConf) {
+LOG.debug("Preparing...");
+GangliaReporter.Builder builder = 
GangliaReporter.forRegistry(metricsRegistry);
+
+TimeUnit durationUnit = 
MetricsUtils.getMetricsDurationUnit(reporterConf);
+if (durationUnit != null) {
+builder.convertDurationsTo(durationUnit);
+}
+
+TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+if (rateUnit != null) {
+builder.convertRatesTo(rateUnit);
+}
+
+StormMetricsFilter filter = getMetricsFilter(reporterConf);
+if(filter != null){
+builder.filter(filter);
+}
+String prefix = getMetricsPrefixedWith(reporterConf);
+if (prefix != null) {
+builder.prefixedWith(prefix);
+}
+
+Integer dmax = getGangliaDMax(reporterConf);
+if (prefix != null) {
+builder.withDMax(dmax);
+}
+
+Integer tmax = getGangliaTMax(reporterConf);
+if (prefix != null) {
+builder.withTMax(tmax);
+}
+
+//defaults to 10
+reportingPeriod = getReportPeriod(reporterConf);
+
+//defaults to seconds
+reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+// Not exposed:
+// * withClock(Clock)
+
+String group = getMetricsTargetUDPGroup(reporterConf);
+Integer port = getMetricsTargetPort(reporterConf);
+String udpAddressingMode = 
getMetricsTargetUDPAddressingMode(reporterConf);
+Integer ttl = getMetricsTargetTtl(reporterConf);
+
+GMetric.UDPAddressingMode mode = 
udpAddressingMode.equalsIgnoreCase("multicast") ?
+GMetric.UDPAddressingMode.MULTICAST : 
GMetric.UDPAddressingMode.UNICAST;
+
+try {
+GMetric sender = new GMetric(group, port, mode, ttl);
+reporter = builder.build(sender);
+}catch (IOException ioe){
+//TODO
--- End diff --

Let's either address or remove the line.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140984437
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
 ---
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.graphite.GraphiteReporter;
+import com.codahale.metrics.graphite.GraphiteSender;
+import com.codahale.metrics.graphite.GraphiteUDP;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class GraphiteStormReporter extends 
ScheduledStormReporter {
+private final static Logger LOG = 
LoggerFactory.getLogger(GraphiteStormReporter.class);
+
+public static final String GRAPHITE_PREFIXED_WITH = 
"graphite.prefixed.with";
+public static final String GRAPHITE_HOST = "graphite.host";
+public static final String GRAPHITE_PORT = "graphite.port";
+public static final String GRAPHITE_TRANSPORT = "graphite.transport";
+
+@Override
+public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map 
reporterConf) {
+LOG.debug("Preparing...");
+GraphiteReporter.Builder builder = 
GraphiteReporter.forRegistry(metricsRegistry);
+
+TimeUnit durationUnit = 
MetricsUtils.getMetricsDurationUnit(reporterConf);
+if (durationUnit != null) {
+builder.convertDurationsTo(durationUnit);
+}
+
+TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+if (rateUnit != null) {
+builder.convertRatesTo(rateUnit);
+}
+
+StormMetricsFilter filter = getMetricsFilter(reporterConf);
+if(filter != null){
+builder.filter(filter);
+}
+String prefix = getMetricsPrefixedWith(reporterConf);
+if (prefix != null) {
+builder.prefixedWith(prefix);
+}
+
+//defaults to 10
+reportingPeriod = getReportPeriod(reporterConf);
+
+//defaults to seconds
+reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+// Not exposed:
+// * withClock(Clock)
+
+String host = getMetricsTargetHost(reporterConf);
+Integer port = getMetricsTargetPort(reporterConf);
+String transport = getMetricsTargetTransport(reporterConf);
+GraphiteSender sender = null;
+//TODO: error checking
+if (transport.equalsIgnoreCase("udp")) {
+sender = new GraphiteUDP(host, port);
+} else {
+//TODO: pickled support
--- End diff --

Let's either address or remove the line.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140972434
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
--- End diff --

If REGISTRY is OK with registering same metrics name multiple time (that 
looks like assumption of `meter()`), we can just make this same as `meter()`.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140973733
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId){
+String metricName = metricName(name, context.getStormId(), 
componentId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+String localHost = 
(String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME);
+if(localHost != null){
+hostName = localHost;
+} else {
+try {
+hostName = 
InetAddress.getLocalHost().getCanonicalHostName();
+} catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname ill be reported" +
+ " as 'localhost'.");
+}
+}
+
+LOG.info("Starting metrics reporters...");
+List> reporterList = (List>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+if(reporterList != null && reporterList.size() > 0) {
+for (Map reporterConfig : reporterList) {
+// only start those requested
+ 

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140982289
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId){
+String metricName = metricName(name, context.getStormId(), 
componentId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+String localHost = 
(String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME);
+if(localHost != null){
+hostName = localHost;
+} else {
+try {
+hostName = 
InetAddress.getLocalHost().getCanonicalHostName();
+} catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname ill be reported" +
+ " as 'localhost'.");
+}
+}
+
+LOG.info("Starting metrics reporters...");
+List> reporterList = (List>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+for(Map reporterConfig : reporterList){
+// only start those requested
+List daemons = 
(List)reporterConfig.get("daemons");
+

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140985766
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter 
implements StormReporter{
+private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+protected ScheduledReporter reporter;
+long reportingPeriod;
+TimeUnit reportingPeriodUnit;
+
+@Override
+public void start() {
+if (reporter != null) {
+LOG.debug("Starting...");
+reporter.start(reportingPeriod, reportingPeriodUnit);
+} else {
+throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+}
+}
+
+@Override
+public void stop() {
+if (reporter != null) {
+LOG.debug("Stopping...");
+reporter.stop();
+} else {
+throw new IllegalStateException("Attempt to stop without 
preparing " + getClass().getSimpleName());
+}
+}
+
+
+static TimeUnit getReportPeriodUnit(Map reporterConf) {
--- End diff --

We may want to add `public` to support custom reporter implementations 
based on ScheduledStormReporter.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140967211
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 ---
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter 
implements StormReporter{
--- End diff --

Type param T does not seem to be used in the class. 
implements StormReporter - should have some type param


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-26 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r140966853
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java ---
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+
+import java.util.Map;
+
+public interface StormReporter {
--- End diff --

Where is type T used in this interface?


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-09-08 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r137805760
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java 
---
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+
+public class AnchoredWordCount {
+public static class RandomSentenceSpout extends BaseRichSpout {
+private static final Logger LOG = 
LoggerFactory.getLogger(RandomSentenceSpout.class);
+
+SpoutOutputCollector _collector;
+Random _rand;
--- End diff --

Can we try and follow the style conventions in place in 2.x?  It would make 
the upmerge a lot simpler.


---


[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-08-13 Thread abellina
Github user abellina commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r132852928
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId){
+String metricName = metricName(name, context.getStormId(), 
componentId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+String localHost = 
(String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME);
+if(localHost != null){
+hostName = localHost;
+} else {
+try {
+hostName = 
InetAddress.getLocalHost().getCanonicalHostName();
+} catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname ill be reported" +
+ " as 'localhost'.");
+}
+}
+
+LOG.info("Starting metrics reporters...");
+List> reporterList = (List>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+for(Map reporterConfig : reporterList){
+// only start those requested
+List daemons = 
(List)reporterConfig.get("daemons");
+   

[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API

2017-08-13 Thread abellina
Github user abellina commented on a diff in the pull request:

https://github.com/apache/storm/pull/2203#discussion_r132853157
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+private static final List REPORTERS = new ArrayList<>();
+
+private static String hostName = null;
+
+public static  SimpleGauge  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
+SimpleGauge gauge = new SimpleGauge<>(initialValue);
+String metricName = metricName(name, topologyId, componentId, 
port);
+if(REGISTRY.getGauges().containsKey(metricName)){
+return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+} else {
+return REGISTRY.register(metricName, gauge);
+}
+}
+
+public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
+return new DisruptorMetrics(
+StormMetricRegistry.gauge(0L, name + "-capacity", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0L, name + "-overflow", 
topologyId, componentId, port),
+StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
+);
+}
+
+public static Meter meter(String name, WorkerTopologyContext context, 
String componentId){
+String metricName = metricName(name, context.getStormId(), 
componentId, context.getThisWorkerPort());
+return REGISTRY.meter(metricName);
+}
+
+public static void start(Map stormConfig, DaemonType 
type){
+String localHost = 
(String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME);
+if(localHost != null){
+hostName = localHost;
+} else {
+try {
+hostName = 
InetAddress.getLocalHost().getCanonicalHostName();
+} catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname ill be reported" +
+ " as 'localhost'.");
+}
+}
+
+LOG.info("Starting metrics reporters...");
+List> reporterList = (List>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+for(Map reporterConfig : reporterList){
+// only start those requested
+List daemons = 
(List)reporterConfig.get("daemons");
+   

  1   2   >