[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2203 ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161356664 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ +String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); +return REGISTRY.counter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +try { +hostName = dotToUnderScore(Utils.localHostname()); +} catch (UnknownHostException e) { +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161326958 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ +String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); +return REGISTRY.counter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +try { +hostName = dotToUnderScore(Utils.localHostname()); +} catch (UnknownHostException e) { +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161324136 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ +String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); +return REGISTRY.counter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +try { +hostName = dotToUnderScore(Utils.localHostname()); +} catch (UnknownHostException e) { +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161306615 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ +String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); +return REGISTRY.counter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +try { +hostName = dotToUnderScore(Utils.localHostname()); +} catch (UnknownHostException e) { +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161303474 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ +String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); +return REGISTRY.counter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +try { +hostName = dotToUnderScore(Utils.localHostname()); +} catch (UnknownHostException e) { +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161301526 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ +String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); +return REGISTRY.counter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +try { +hostName = dotToUnderScore(Utils.localHostname()); +} catch (UnknownHostException e) { +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161292783 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ +String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); +return REGISTRY.counter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +try { +hostName = dotToUnderScore(Utils.localHostname()); +} catch (UnknownHostException e) { +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161291952 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ +String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); +return REGISTRY.counter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +try { +hostName = dotToUnderScore(Utils.localHostname()); +} catch (UnknownHostException e) { +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161277608 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ +String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); +return REGISTRY.counter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +try { +hostName = dotToUnderScore(Utils.localHostname()); +} catch (UnknownHostException e) { +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161277889 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ +String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); +return REGISTRY.counter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +try { +hostName = dotToUnderScore(Utils.localHostname()); +} catch (UnknownHostException e) { +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161277857 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ +String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); +return REGISTRY.counter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +try { +hostName = dotToUnderScore(Utils.localHostname()); +} catch (UnknownHostException e) { +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161139613 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java --- @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import org.apache.storm.task.WorkerTopologyContext; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class TaskMetrics { +ConcurrentMapackedByStream = new ConcurrentHashMap<>(); +ConcurrentMap failedByStream = new ConcurrentHashMap<>(); +ConcurrentMap emittedByStream = new ConcurrentHashMap<>(); +ConcurrentMap transferredByStream = new ConcurrentHashMap<>(); + +private String topologyId; +private String componentId; +private Integer taskId; +private Integer workerPort; + +public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid){ +this.topologyId = context.getStormId(); +this.componentId = componentId; +this.taskId = taskid; +this.workerPort = context.getThisWorkerPort(); +} + +public Counter getAcked(String streamId) { +Counter c = this.ackedByStream.get(streamId); +if (c == null) { +c = StormMetricRegistry.counter("acked", this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); +this.ackedByStream.put(streamId, c); +} +return c; +} + +public Counter getFailed(String streamId) { +Counter c = this.ackedByStream.get(streamId); --- End diff -- Good catch. Thanks for the fix. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161135637 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java --- @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import org.apache.storm.task.WorkerTopologyContext; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class TaskMetrics { +ConcurrentMapackedByStream = new ConcurrentHashMap<>(); +ConcurrentMap failedByStream = new ConcurrentHashMap<>(); +ConcurrentMap emittedByStream = new ConcurrentHashMap<>(); +ConcurrentMap transferredByStream = new ConcurrentHashMap<>(); + +private String topologyId; +private String componentId; +private Integer taskId; +private Integer workerPort; + +public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid){ +this.topologyId = context.getStormId(); +this.componentId = componentId; +this.taskId = taskid; +this.workerPort = context.getThisWorkerPort(); +} + +public Counter getAcked(String streamId) { +Counter c = this.ackedByStream.get(streamId); +if (c == null) { +c = StormMetricRegistry.counter("acked", this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); +this.ackedByStream.put(streamId, c); +} +return c; +} + +public Counter getFailed(String streamId) { +Counter c = this.ackedByStream.get(streamId); --- End diff -- missed a spot: it should be `failedByStream` ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161087710 --- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj --- @@ -265,6 +265,7 @@ :stats (mk-executor-stats <> (sampling-rate storm-conf)) :interval->task->metric-registry (HashMap.) :task->component (:task->component worker) + :task-metrics (TaskMetrics/taskMetricsMap (first task-ids) (last task-ids) worker-context component-id) --- End diff -- @HeartSaVioR Nice, thanks! ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161086016 --- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj --- @@ -265,6 +265,7 @@ :stats (mk-executor-stats <> (sampling-rate storm-conf)) :interval->task->metric-registry (HashMap.) :task->component (:task->component worker) + :task-metrics (TaskMetrics/taskMetricsMap (first task-ids) (last task-ids) worker-context component-id) --- End diff -- Never mind. Just fixed and pushed new commit. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r161080512 --- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj --- @@ -265,6 +265,7 @@ :stats (mk-executor-stats <> (sampling-rate storm-conf)) :interval->task->metric-registry (HashMap.) :task->component (:task->component worker) + :task-metrics (TaskMetrics/taskMetricsMap (first task-ids) (last task-ids) worker-context component-id) --- End diff -- We can create TaskMetrics instance in task-data and even get rid of the map lookup. I guess TaskMetrics doesn't have any executor specific metrics. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160805811 --- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj --- @@ -446,7 +451,7 @@ (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta + (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" (:worker-context executor-data) (:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream tuple-info)) (:stream tuple-info) time-delta --- End diff -- That makes a lot more sense now. Thanks for clarifying. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160801985 --- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj --- @@ -446,7 +451,7 @@ (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta + (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" (:worker-context executor-data) (:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream tuple-info)) (:stream tuple-info) time-delta --- End diff -- I was thinking that the map would have just the stream id as the key, but the name of the metric could be anything. i.e. ``` public class TaskStats { ConcurrentMapackByStream = ...; TaskStats(String topoId, String componentId, int taskid, int workerPort) { ///Save these away... } public Counter getAck(String streamId) { Counter c = ackByStream.get(streamId); if (c == null) { c = StormMetricRegistry.counter("acked", topoId, componentId, taskId, streamId); ackByStream.put(streamId, c); } return c; } } ``` We could then have a map of TaskStats in the executor-data, one for each executorId the executor is over. Or like I said if you want to go crazy you could take the list of known streams (inbound and outbound depending on the stat) and populate the metrics in the constructor, but it is not necessary. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160777488 --- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj --- @@ -446,7 +451,7 @@ (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta + (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" (:worker-context executor-data) (:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream tuple-info)) (:stream tuple-info) time-delta --- End diff -- @revans2 regarding map lookup, wouldn't we have to concatenate in order to create the lookup key? ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160544887 --- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj --- @@ -446,7 +451,7 @@ (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta + (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" (:worker-context executor-data) (:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream tuple-info)) (:stream tuple-info) time-delta --- End diff -- I think we can turn it into just a map lookup, although I don't know how much work that will be. We should know the stream names ahead of time, from the StormTopology. As such we can create all of the metrics ahead of time for each bolt/spout instance, and cache them in a map keyed by the stream id. Then when a tuple is Emitted/Acked/etc. we look up the Counter based off of the stream id and use it. No object allocation and no string concatenation involved. If we don't want to bother with parsing the StormTopology and prepopulating it, we could do lazy initialization like we are doing now. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160513274 --- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj --- @@ -446,7 +451,7 @@ (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta + (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" (:worker-context executor-data) (:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream tuple-info)) (:stream tuple-info) time-delta --- End diff -- It's not ideal. Here's some background from earlier review: @HeartSaVioR: >> Looks like we compose metric name and lookup from REGISTRY every time, even without executor ID and stream ID. I can see more calculation should be done after addressing, but not sure how much it affects performance. If we could also memorize metric name per combination of parameters it might help, but I'm also not sure how much it will help. @ptgoetz: >Prior to adding stream ID, we could store the metric as a variable and reuse it without having to do a lookup on each use. Adding stream ID required (unless I'm missing something) doing the lookup on every use. There might be additional optimizations, but because metrics names are composed of several fields, some level of string concatenation is unavoidable. For example, we could try to optimize lookups by caching metrics with metric name as the key, but we would still have to do concatenation to create the lookup key. >I suppose we could create a MetricName class to serve as the cache key, but we'd be trading string concatenation for object creation (unless we name MetricName mutable with setX methods). ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160509152 --- Diff: storm-core/src/jvm/org/apache/storm/nimbus/DefaultTopologyValidator.java --- @@ -17,15 +17,49 @@ */ package org.apache.storm.nimbus; +import org.apache.storm.generated.Bolt; import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.SpoutSpec; import org.apache.storm.generated.StormTopology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Map; public class DefaultTopologyValidator implements ITopologyValidator { +private static final Logger LOG = LoggerFactory.getLogger(DefaultTopologyValidator.class); @Override public void prepare(Map StormConf){ } @Override -public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { +public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { +if(topologyName.contains(".")){ +LOG.warn("Metrics for topology name '{}' will be reported as '{}'.", topologyName, topologyName.replace('.', '_') ); +} +Mapspouts = topology.get_spouts(); +for(String spoutName : spouts.keySet()){ +if(spoutName.contains(".")){ +LOG.warn("Metrics for spout name '{}' will be reported as '{}'.", spoutName, spoutName.replace('.', '_') ); +} +SpoutSpec spoutSpec = spouts.get(spoutName); +for(String streamName : spoutSpec.get_common().get_streams().keySet()){ +if(streamName.contains(".")){ +LOG.warn("Metrics for stream name '{}' will be reported as '{}'.", streamName, streamName.replace('.', '_') ); +} +} +} + +Map bolts = topology.get_bolts(); +for(String boltName : bolts.keySet()){ +if(boltName.contains(".")){ +LOG.warn("Metrics for bolt name '{}' will be reported as '{}'.", boltName, boltName.replace('.', '_') ); +} +Bolt bolt = bolts.get(boltName); +for(String streamName : bolt.get_common().get_streams().keySet()){ +if(streamName.contains(".")){ +LOG.warn("Metrics for stream name '{}' will be reported as '{}'.", streamName, streamName.replace('.', '_') ); +} +} +} --- End diff -- I'll give you that it is better than nothing. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160508705 --- Diff: storm-core/src/jvm/org/apache/storm/nimbus/DefaultTopologyValidator.java --- @@ -17,15 +17,49 @@ */ package org.apache.storm.nimbus; +import org.apache.storm.generated.Bolt; import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.SpoutSpec; import org.apache.storm.generated.StormTopology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Map; public class DefaultTopologyValidator implements ITopologyValidator { +private static final Logger LOG = LoggerFactory.getLogger(DefaultTopologyValidator.class); @Override public void prepare(Map StormConf){ } @Override -public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { +public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { +if(topologyName.contains(".")){ +LOG.warn("Metrics for topology name '{}' will be reported as '{}'.", topologyName, topologyName.replace('.', '_') ); +} +Mapspouts = topology.get_spouts(); +for(String spoutName : spouts.keySet()){ +if(spoutName.contains(".")){ +LOG.warn("Metrics for spout name '{}' will be reported as '{}'.", spoutName, spoutName.replace('.', '_') ); +} +SpoutSpec spoutSpec = spouts.get(spoutName); +for(String streamName : spoutSpec.get_common().get_streams().keySet()){ +if(streamName.contains(".")){ +LOG.warn("Metrics for stream name '{}' will be reported as '{}'.", streamName, streamName.replace('.', '_') ); +} +} +} + +Map bolts = topology.get_bolts(); +for(String boltName : bolts.keySet()){ +if(boltName.contains(".")){ +LOG.warn("Metrics for bolt name '{}' will be reported as '{}'.", boltName, boltName.replace('.', '_') ); +} +Bolt bolt = bolts.get(boltName); +for(String streamName : bolt.get_common().get_streams().keySet()){ +if(streamName.contains(".")){ +LOG.warn("Metrics for stream name '{}' will be reported as '{}'.", streamName, streamName.replace('.', '_') ); +} +} +} --- End diff -- True. The user will have to hunt, but I suppose it's better than silence. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160506051 --- Diff: storm-core/src/jvm/org/apache/storm/task/TopologyContext.java --- @@ -386,4 +388,28 @@ public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucke public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) { return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs); } + +public Timer registerTimer(String name){ +return StormMetricRegistry.registry().timer(metricName(name)); +} + +public Histogram registerHistogram(String name){ +return StormMetricRegistry.registry().histogram(metricName(name)); +} + +public Meter registerMeter(String name){ +return StormMetricRegistry.registry().meter(metricName(name)); +} + +public Counter registerCounter(String name){ +return StormMetricRegistry.registry().counter(metricName(name)); +} + +public Gauge registerGauge(String name, Gauge gauge){ +return StormMetricRegistry.registry().register(metricName(name), gauge); +} + +private String metricName(String name){ +return String.format("storm.topology.%s.%s.%s-%s", getStormId(), getThisComponentId(), getThisWorkerPort(), name); --- End diff -- Yes. Good catch. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160505473 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +String localHost = "localhost"; +try { +hostName = dotToUnderScore(Utils.localHostname()); +} catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname will be reported" + + " as 'localhost'."); +} + +LOG.info("Starting metrics reporters..."); +List
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160496988 --- Diff: storm-core/src/clj/org/apache/storm/daemon/executor.clj --- @@ -446,7 +451,7 @@ (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta + (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" (:worker-context executor-data) (:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream tuple-info)) (:stream tuple-info) time-delta --- End diff -- nit: how expensive is it to get the counter? Building up the metric name does not sound ideal, and this is on the critical path. This goes for a lot of places. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160499046 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId){ +String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +String localHost = (String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME); +if(localHost != null){ +hostName = localHost; +} else { +try { +hostName = InetAddress.getLocalHost().getCanonicalHostName(); +} catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" + + " as 'localhost'."); +} +} + +LOG.info("Starting metrics reporters..."); +List
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160499205 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +String localHost = "localhost"; +try { +hostName = dotToUnderScore(Utils.localHostname()); +} catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname will be reported" + + " as 'localhost'."); +} + +LOG.info("Starting metrics reporters..."); +List >
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160498091 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); --- End diff -- nit: shouldn't this move down to line 54? where it is actually used? Or perhaps it can be inlined. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160501227 --- Diff: storm-core/src/jvm/org/apache/storm/nimbus/DefaultTopologyValidator.java --- @@ -17,15 +17,49 @@ */ package org.apache.storm.nimbus; +import org.apache.storm.generated.Bolt; import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.SpoutSpec; import org.apache.storm.generated.StormTopology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Map; public class DefaultTopologyValidator implements ITopologyValidator { +private static final Logger LOG = LoggerFactory.getLogger(DefaultTopologyValidator.class); @Override public void prepare(Map StormConf){ } @Override -public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { +public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { +if(topologyName.contains(".")){ +LOG.warn("Metrics for topology name '{}' will be reported as '{}'.", topologyName, topologyName.replace('.', '_') ); +} +Mapspouts = topology.get_spouts(); +for(String spoutName : spouts.keySet()){ +if(spoutName.contains(".")){ +LOG.warn("Metrics for spout name '{}' will be reported as '{}'.", spoutName, spoutName.replace('.', '_') ); +} +SpoutSpec spoutSpec = spouts.get(spoutName); +for(String streamName : spoutSpec.get_common().get_streams().keySet()){ +if(streamName.contains(".")){ +LOG.warn("Metrics for stream name '{}' will be reported as '{}'.", streamName, streamName.replace('.', '_') ); +} +} +} + +Map bolts = topology.get_bolts(); +for(String boltName : bolts.keySet()){ +if(boltName.contains(".")){ +LOG.warn("Metrics for bolt name '{}' will be reported as '{}'.", boltName, boltName.replace('.', '_') ); +} +Bolt bolt = bolts.get(boltName); +for(String streamName : bolt.get_common().get_streams().keySet()){ +if(streamName.contains(".")){ +LOG.warn("Metrics for stream name '{}' will be reported as '{}'.", streamName, streamName.replace('.', '_') ); +} +} +} --- End diff -- This is fine, but kind of useless. The warnings are going to show up in the number log, so the end user never sees them. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160498347 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){ +String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +String localHost = "localhost"; --- End diff -- I think this line is unused, or perhaps you actually wanted to default the hostName to "localhost" but ended up declaring a variable instead. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160501820 --- Diff: storm-core/src/jvm/org/apache/storm/task/TopologyContext.java --- @@ -386,4 +388,28 @@ public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucke public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) { return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs); } + +public Timer registerTimer(String name){ +return StormMetricRegistry.registry().timer(metricName(name)); +} + +public Histogram registerHistogram(String name){ +return StormMetricRegistry.registry().histogram(metricName(name)); +} + +public Meter registerMeter(String name){ +return StormMetricRegistry.registry().meter(metricName(name)); +} + +public Counter registerCounter(String name){ +return StormMetricRegistry.registry().counter(metricName(name)); +} + +public Gauge registerGauge(String name, Gauge gauge){ +return StormMetricRegistry.registry().register(metricName(name), gauge); +} + +private String metricName(String name){ +return String.format("storm.topology.%s.%s.%s-%s", getStormId(), getThisComponentId(), getThisWorkerPort(), name); --- End diff -- Don't we need to replace the "."'s in getThidComponentId() with '_'? ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r160499395 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId){ +String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +String localHost = (String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME); +if(localHost != null){ +hostName = localHost; +} else { +try { +hostName = InetAddress.getLocalHost().getCanonicalHostName(); +} catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" + + " as 'localhost'."); +} +} + +LOG.info("Starting metrics reporters..."); +List > reporterList = (List >)stormConfig.get(Config.STORM_METRICS_REPORTERS); +if(reporterList != null && reporterList.size() > 0) { +for (Map reporterConfig : reporterList) { +// only start those requested +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r159519245 --- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java --- @@ -65,7 +66,7 @@ private static final Object INTERRUPT = new Object(); private static final String PREFIX = "disruptor-"; private static final FlusherPool FLUSHER = new FlusherPool(); -private static final Timer METRICS_TIMER = new Timer("disruptor-metrics-timer", true); +private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1); --- End diff -- You were right. Fixed. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r159515622 --- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java --- @@ -65,7 +66,7 @@ private static final Object INTERRUPT = new Object(); private static final String PREFIX = "disruptor-"; private static final FlusherPool FLUSHER = new FlusherPool(); -private static final Timer METRICS_TIMER = new Timer("disruptor-metrics-timer", true); +private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1); --- End diff -- Good catch. I'll check and update as necessary. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r159514108 --- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java --- @@ -65,7 +66,7 @@ private static final Object INTERRUPT = new Object(); private static final String PREFIX = "disruptor-"; private static final FlusherPool FLUSHER = new FlusherPool(); -private static final Timer METRICS_TIMER = new Timer("disruptor-metrics-timer", true); +private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1); --- End diff -- Thanks for replacing the Timer. If the threads for this still need to be daemon threads, you should use the constructor that takes a ThreadFactory, I believe the default factory produces non-daemon threads. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r158555456 --- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java --- @@ -418,12 +430,24 @@ public DisruptorQueue(String queueName, ProducerType type, int size, long readTi _barrier = _buffer.newBarrier(); _buffer.addGatingSequences(_consumer); _metrics = new QueueMetrics(); +_disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, port); //The batch size can be no larger than half the full queue size. //This is mostly to avoid contention issues. _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2)); _flusher = new Flusher(Math.max(flushInterval, 1), _queueName); _flusher.start(); +try { +METRICS_TIMER.schedule(new TimerTask() { +@Override +public void run() { +_disruptorMetrics.set(_metrics); +} +}, 15000, 15000); +} catch (IllegalStateException e){ --- End diff -- Thanks, but this seems like it could hide errors by accident. If we replace the Timer with a https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html we can check if it is shut down instead of catching this exception, and do a debug or trace log if it is, so it's obvious why scheduling isn't happening in case someone has to debug this later. I don't know why it isn't being shown in the current diff, but there's also this suggestion from earlier that doesn't appear to have a response: https://github.com/apache/storm/pull/2203/files/00a382b017c1e29863ac4d9a4449086ef79384e4#r128586571 ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r158554446 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java --- @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Reporter; + +import java.util.Map; + +public interface StormReporter extends Reporter { --- End diff -- Thanks. I didn't realize that `ScheduledReporter.stop` and `ScheduledReporter.close` seem to do the same thing. I thought `stop` was more of a pause-so-we-can-resume-later thing. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r158549262 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +protected long reportingPeriod; +protected TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { +LOG.debug("Starting..."); +reporter.start(reportingPeriod, reportingPeriodUnit); +} else { +throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); +} +} + +@Override +public void stop() { +if (reporter != null) { +LOG.debug("Stopping..."); +reporter.stop(); +} else { +throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); +} +} + + +public static TimeUnit getReportPeriodUnit(MapreporterConf) { +TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); +return unit == null ? TimeUnit.SECONDS : unit; +} + +private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) { +String rateUnitString = Utils.getString(reporterConf.get(configName), null); +if (rateUnitString != null) { +return TimeUnit.valueOf(rateUnitString); +} +return null; +} + +public static long getReportPeriod(Map reporterConf) { +return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue(); +} + +public static StormMetricsFilter getMetricsFilter(Map reporterConf){ +StormMetricsFilter filter = null; +Map filterConf = (Map)reporterConf.get("filter"); +if(filterConf != null) { +String clazz = (String) filterConf.get("class"); +if (clazz != null) { +try { +filter = (StormMetricsFilter) Metrics2Utils.instantiate(clazz); +filter.prepare(filterConf); +} catch (Exception e) { +LOG.warn("Unable to instantiate StormMetricsFilter class: {}", clazz); --- End diff -- Okay. I changed it to crash instead. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r158549210 --- Diff: storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java --- @@ -493,6 +493,46 @@ public void validateField(String name, Object o) { } } +public static class MetricReportersValidator extends Validator { + +@Override +public void validateField(String name, Object o) { +if(o == null) { +return; +} +SimpleTypeValidator.validateField(name, Map.class, o); +if(!((Map) o).containsKey("class") ) { --- End diff -- Done. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r158549192 --- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java --- @@ -418,12 +430,19 @@ public DisruptorQueue(String queueName, ProducerType type, int size, long readTi _barrier = _buffer.newBarrier(); _buffer.addGatingSequences(_consumer); _metrics = new QueueMetrics(); +_disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, port); //The batch size can be no larger than half the full queue size. //This is mostly to avoid contention issues. _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2)); _flusher = new Flusher(Math.max(flushInterval, 1), _queueName); _flusher.start(); +METRICS_TIMER.schedule(new TimerTask(){ --- End diff -- Addressed in latest commit. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r158545079 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java --- @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Reporter; + +import java.util.Map; + +public interface StormReporter extends Reporter { --- End diff -- If I'm interpreting your question correctly, that's what the `stop()` method is for. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r158539020 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +protected long reportingPeriod; +protected TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { +LOG.debug("Starting..."); +reporter.start(reportingPeriod, reportingPeriodUnit); +} else { +throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); +} +} + +@Override +public void stop() { +if (reporter != null) { +LOG.debug("Stopping..."); +reporter.stop(); +} else { +throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); +} +} + + +public static TimeUnit getReportPeriodUnit(MapreporterConf) { +TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); +return unit == null ? TimeUnit.SECONDS : unit; +} + +private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) { +String rateUnitString = Utils.getString(reporterConf.get(configName), null); +if (rateUnitString != null) { +return TimeUnit.valueOf(rateUnitString); +} +return null; +} + +public static long getReportPeriod(Map reporterConf) { +return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue(); +} + +public static StormMetricsFilter getMetricsFilter(Map reporterConf){ +StormMetricsFilter filter = null; +Map filterConf = (Map)reporterConf.get("filter"); +if(filterConf != null) { +String clazz = (String) filterConf.get("class"); +if (clazz != null) { +try { +filter = (StormMetricsFilter) Metrics2Utils.instantiate(clazz); +filter.prepare(filterConf); +} catch (Exception e) { +LOG.warn("Unable to instantiate StormMetricsFilter class: {}", clazz); --- End diff -- Sorry, forgot to respond to this. Don't we usually crash if Storm is misconfigured in some way? I'm worried it will be less visible to users if we quietly(-ish, there's a warning in the log) do the wrong thing, rather than crashing. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r158538088 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +protected long reportingPeriod; +protected TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { --- End diff -- Makes sense, thanks. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r158538124 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +protected long reportingPeriod; +protected TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { +LOG.debug("Starting..."); +reporter.start(reportingPeriod, reportingPeriodUnit); +} else { +throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); +} +} + +@Override +public void stop() { +if (reporter != null) { +LOG.debug("Stopping..."); +reporter.stop(); +} else { +throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); +} +} + + +public static TimeUnit getReportPeriodUnit(MapreporterConf) { +TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); +return unit == null ? TimeUnit.SECONDS : unit; --- End diff -- Thanks for explaining. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155864521 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +protected long reportingPeriod; +protected TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { +LOG.debug("Starting..."); +reporter.start(reportingPeriod, reportingPeriodUnit); +} else { +throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); +} +} + +@Override +public void stop() { +if (reporter != null) { +LOG.debug("Stopping..."); +reporter.stop(); +} else { +throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); +} +} + + +public static TimeUnit getReportPeriodUnit(MapreporterConf) { +TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); +return unit == null ? TimeUnit.SECONDS : unit; +} + +private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) { +String rateUnitString = Utils.getString(reporterConf.get(configName), null); +if (rateUnitString != null) { +return TimeUnit.valueOf(rateUnitString); +} +return null; +} + +public static long getReportPeriod(Map reporterConf) { +return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue(); +} + +public static StormMetricsFilter getMetricsFilter(Map reporterConf){ +StormMetricsFilter filter = null; +Map filterConf = (Map)reporterConf.get("filter"); +if(filterConf != null) { +String clazz = (String) filterConf.get("class"); +if (clazz != null) { +try { +filter = (StormMetricsFilter) Metrics2Utils.instantiate(clazz); +filter.prepare(filterConf); +} catch (Exception e) { +LOG.warn("Unable to instantiate StormMetricsFilter class: {}", clazz); --- End diff -- I could go either way... But it didn't seem right to crash just because of a misconfigured filter. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155863202 --- Diff: storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java --- @@ -493,6 +493,46 @@ public void validateField(String name, Object o) { } } +public static class MetricReportersValidator extends Validator { + +@Override +public void validateField(String name, Object o) { +if(o == null) { +return; +} +SimpleTypeValidator.validateField(name, Map.class, o); +if(!((Map) o).containsKey("class") ) { +throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); +} +if(!((Map) o).containsKey("daemons") ) { +throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons"); +} else { +// daemons can only be 'nimbus', 'supervisor', or 'worker' +Object list = ((Map)o).get("daemons"); +if(list == null || !(list instanceof List)){ +throw new IllegalArgumentException("Field 'daemons' must be a non-null list."); +} +List daemonList = (List)list; +for(Object string : daemonList){ +if (string instanceof String && +(((String) string).equals("nimbus") || +((String) string).equals("supervisor") || +((String) string).equals("worker"))) { +return; +} +throw new IllegalArgumentException("Field daemons must contain at least one of \"nimbus\", \"supervisor\", or \"worker\""); +} + +} +if(((Map)o).containsKey("filter")){ +Map filterMap = (Map)((Map)o).get("filter"); +SimpleTypeValidator.validateField("filter", String.class, filterMap.get("class")); --- End diff -- Yes. Good catch. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155863192 --- Diff: storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java --- @@ -493,6 +493,46 @@ public void validateField(String name, Object o) { } } +public static class MetricReportersValidator extends Validator { + +@Override +public void validateField(String name, Object o) { +if(o == null) { +return; +} +SimpleTypeValidator.validateField(name, Map.class, o); +if(!((Map) o).containsKey("class") ) { +throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); +} +if(!((Map) o).containsKey("daemons") ) { +throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons"); +} else { +// daemons can only be 'nimbus', 'supervisor', or 'worker' +Object list = ((Map)o).get("daemons"); +if(list == null || !(list instanceof List)){ +throw new IllegalArgumentException("Field 'daemons' must be a non-null list."); +} +List daemonList = (List)list; +for(Object string : daemonList){ +if (string instanceof String && +(((String) string).equals("nimbus") || +((String) string).equals("supervisor") || +((String) string).equals("worker"))) { +return; +} +throw new IllegalArgumentException("Field daemons must contain at least one of \"nimbus\", \"supervisor\", or \"worker\""); +} + +} +if(((Map)o).containsKey("filter")){ +Map filterMap = (Map)((Map)o).get("filter"); +SimpleTypeValidator.validateField("filter", String.class, filterMap.get("class")); +} +SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class")); --- End diff -- Not here. The `name` variable should contain "class" here. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155856433 --- Diff: storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java --- @@ -493,6 +493,46 @@ public void validateField(String name, Object o) { } } +public static class MetricReportersValidator extends Validator { + +@Override +public void validateField(String name, Object o) { +if(o == null) { +return; +} +SimpleTypeValidator.validateField(name, Map.class, o); +if(!((Map) o).containsKey("class") ) { +throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); +} +if(!((Map) o).containsKey("daemons") ) { +throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons"); +} else { +// daemons can only be 'nimbus', 'supervisor', or 'worker' +Object list = ((Map)o).get("daemons"); +if(list == null || !(list instanceof List)){ +throw new IllegalArgumentException("Field 'daemons' must be a non-null list."); +} +List daemonList = (List)list; +for(Object string : daemonList){ +if (string instanceof String && +(((String) string).equals("nimbus") || +((String) string).equals("supervisor") || +((String) string).equals("worker"))) { +return; --- End diff -- Good catch. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155855673 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +protected long reportingPeriod; +protected TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { +LOG.debug("Starting..."); +reporter.start(reportingPeriod, reportingPeriodUnit); +} else { +throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); +} +} + +@Override +public void stop() { +if (reporter != null) { +LOG.debug("Stopping..."); +reporter.stop(); +} else { +throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); +} +} + + +public static TimeUnit getReportPeriodUnit(MapreporterConf) { +TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); +return unit == null ? TimeUnit.SECONDS : unit; --- End diff -- We can handle this in documentation. Documentation is pending the finalization of the approach to metrics naming, paths, metadata, etc. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155855300 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +protected long reportingPeriod; +protected TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { --- End diff -- Yes, but it gives additional information about what went wrong as opposed to a "naked" NPE. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155854567 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java --- @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ganglia.GangliaReporter; +import com.codahale.metrics.MetricRegistry; +import info.ganglia.gmetric4j.gmetric.GMetric; +import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class GangliaStormReporter extends ScheduledStormReporter { +private final static Logger LOG = LoggerFactory.getLogger(GangliaStormReporter.class); + +public static final String GANGLIA_HOST = "ganglia.host"; +public static final String GANGLIA_PORT = "ganglia.port"; +public static final String GANGLIA_PREFIXED_WITH = "ganglia.prefixed.with"; +public static final String GANGLIA_DMAX = "ganglia.dmax"; +public static final String GANGLIA_TMAX = "ganglia.tmax"; +public static final String GANGLIA_UDP_ADDRESSING_MODE = "ganglia.udp.addressing.mode"; +public static final String GANGLIA_RATE_UNIT = "ganglia.rate.unit"; +public static final String GANGLIA_DURATION_UNIT = "ganglia.duration.unit"; +public static final String GANGLIA_TTL = "ganglia.ttl"; +public static final String GANGLIA_UDP_GROUP = "ganglia.udp.group"; + +@Override +public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { +LOG.debug("Preparing..."); +GangliaReporter.Builder builder = GangliaReporter.forRegistry(metricsRegistry); + +TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf); +if (durationUnit != null) { +builder.convertDurationsTo(durationUnit); +} + +TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf); +if (rateUnit != null) { +builder.convertRatesTo(rateUnit); +} + +StormMetricsFilter filter = getMetricsFilter(reporterConf); +if(filter != null){ +builder.filter(filter); +} +String prefix = getMetricsPrefixedWith(reporterConf); +if (prefix != null) { +builder.prefixedWith(prefix); +} + +Integer dmax = getGangliaDMax(reporterConf); +if (prefix != null) { +builder.withDMax(dmax); +} + +Integer tmax = getGangliaTMax(reporterConf); +if (prefix != null) { +builder.withTMax(tmax); +} + +//defaults to 10 +reportingPeriod = getReportPeriod(reporterConf); + +//defaults to seconds +reportingPeriodUnit = getReportPeriodUnit(reporterConf); + +// Not exposed: --- End diff -- It means the `GangliaReporter.Builder.withClock` method is not exposed/setable by the storm configuration. I'll remove it. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155645499 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +protected long reportingPeriod; +protected TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { +LOG.debug("Starting..."); +reporter.start(reportingPeriod, reportingPeriodUnit); +} else { +throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); +} +} + +@Override +public void stop() { +if (reporter != null) { +LOG.debug("Stopping..."); +reporter.stop(); +} else { +throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); +} +} + + +public static TimeUnit getReportPeriodUnit(MapreporterConf) { +TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); +return unit == null ? TimeUnit.SECONDS : unit; +} + +private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) { +String rateUnitString = Utils.getString(reporterConf.get(configName), null); +if (rateUnitString != null) { +return TimeUnit.valueOf(rateUnitString); +} +return null; +} + +public static long getReportPeriod(Map reporterConf) { +return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue(); +} + +public static StormMetricsFilter getMetricsFilter(Map reporterConf){ +StormMetricsFilter filter = null; +Map filterConf = (Map)reporterConf.get("filter"); +if(filterConf != null) { +String clazz = (String) filterConf.get("class"); +if (clazz != null) { +try { +filter = (StormMetricsFilter) Metrics2Utils.instantiate(clazz); +filter.prepare(filterConf); +} catch (Exception e) { +LOG.warn("Unable to instantiate StormMetricsFilter class: {}", clazz); --- End diff -- Same question as the other use of Metrics2Utils, why don't we want to crash? ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155645156 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId){ +String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +String localHost = (String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME); +if(localHost != null){ +hostName = localHost; +} else { +try { +hostName = InetAddress.getLocalHost().getCanonicalHostName(); +} catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" + + " as 'localhost'."); +} +} + +LOG.info("Starting metrics reporters..."); +List > reporterList = (List >)stormConfig.get(Config.STORM_METRICS_REPORTERS); +if(reporterList != null && reporterList.size() > 0) { +for (Map reporterConfig : reporterList) { +// only start those requested +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155639127 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +protected long reportingPeriod; +protected TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { +LOG.debug("Starting..."); +reporter.start(reportingPeriod, reportingPeriodUnit); +} else { +throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); +} +} + +@Override +public void stop() { +if (reporter != null) { +LOG.debug("Stopping..."); +reporter.stop(); +} else { +throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); +} +} + + +public static TimeUnit getReportPeriodUnit(MapreporterConf) { +TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); +return unit == null ? TimeUnit.SECONDS : unit; +} + +private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) { +String rateUnitString = Utils.getString(reporterConf.get(configName), null); +if (rateUnitString != null) { +return TimeUnit.valueOf(rateUnitString); +} +return null; +} + +public static long getReportPeriod(Map reporterConf) { +return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue(); --- End diff -- Same comment about noting the default somewhere visible to users. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155636033 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java --- @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ganglia.GangliaReporter; +import com.codahale.metrics.MetricRegistry; +import info.ganglia.gmetric4j.gmetric.GMetric; +import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class GangliaStormReporter extends ScheduledStormReporter { +private final static Logger LOG = LoggerFactory.getLogger(GangliaStormReporter.class); + +public static final String GANGLIA_HOST = "ganglia.host"; +public static final String GANGLIA_PORT = "ganglia.port"; +public static final String GANGLIA_PREFIXED_WITH = "ganglia.prefixed.with"; +public static final String GANGLIA_DMAX = "ganglia.dmax"; +public static final String GANGLIA_TMAX = "ganglia.tmax"; +public static final String GANGLIA_UDP_ADDRESSING_MODE = "ganglia.udp.addressing.mode"; +public static final String GANGLIA_RATE_UNIT = "ganglia.rate.unit"; +public static final String GANGLIA_DURATION_UNIT = "ganglia.duration.unit"; +public static final String GANGLIA_TTL = "ganglia.ttl"; +public static final String GANGLIA_UDP_GROUP = "ganglia.udp.group"; + +@Override +public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { +LOG.debug("Preparing..."); +GangliaReporter.Builder builder = GangliaReporter.forRegistry(metricsRegistry); + +TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf); +if (durationUnit != null) { +builder.convertDurationsTo(durationUnit); +} + +TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf); +if (rateUnit != null) { +builder.convertRatesTo(rateUnit); +} + +StormMetricsFilter filter = getMetricsFilter(reporterConf); +if(filter != null){ +builder.filter(filter); +} +String prefix = getMetricsPrefixedWith(reporterConf); +if (prefix != null) { +builder.prefixedWith(prefix); +} + +Integer dmax = getGangliaDMax(reporterConf); +if (prefix != null) { +builder.withDMax(dmax); +} + +Integer tmax = getGangliaTMax(reporterConf); +if (prefix != null) { +builder.withTMax(tmax); +} + +//defaults to 10 +reportingPeriod = getReportPeriod(reporterConf); + +//defaults to seconds +reportingPeriodUnit = getReportPeriodUnit(reporterConf); + +// Not exposed: --- End diff -- What does this comment mean? ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155642325 --- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java --- @@ -418,12 +430,19 @@ public DisruptorQueue(String queueName, ProducerType type, int size, long readTi _barrier = _buffer.newBarrier(); _buffer.addGatingSequences(_consumer); _metrics = new QueueMetrics(); +_disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, port); //The batch size can be no larger than half the full queue size. //This is mostly to avoid contention issues. _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2)); _flusher = new Flusher(Math.max(flushInterval, 1), _queueName); _flusher.start(); +METRICS_TIMER.schedule(new TimerTask(){ --- End diff -- Won't this leak the task if the queue is shut down? ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155644347 --- Diff: storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java --- @@ -493,6 +493,46 @@ public void validateField(String name, Object o) { } } +public static class MetricReportersValidator extends Validator { + +@Override +public void validateField(String name, Object o) { +if(o == null) { +return; +} +SimpleTypeValidator.validateField(name, Map.class, o); +if(!((Map) o).containsKey("class") ) { +throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); +} +if(!((Map) o).containsKey("daemons") ) { +throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons"); +} else { +// daemons can only be 'nimbus', 'supervisor', or 'worker' +Object list = ((Map)o).get("daemons"); +if(list == null || !(list instanceof List)){ +throw new IllegalArgumentException("Field 'daemons' must be a non-null list."); +} +List daemonList = (List)list; +for(Object string : daemonList){ +if (string instanceof String && +(((String) string).equals("nimbus") || +((String) string).equals("supervisor") || +((String) string).equals("worker"))) { +return; +} +throw new IllegalArgumentException("Field daemons must contain at least one of \"nimbus\", \"supervisor\", or \"worker\""); +} + +} +if(((Map)o).containsKey("filter")){ +Map filterMap = (Map)((Map)o).get("filter"); +SimpleTypeValidator.validateField("filter", String.class, filterMap.get("class")); --- End diff -- Shouldn't the first parameter be "class"? ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155638805 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +protected long reportingPeriod; +protected TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { +LOG.debug("Starting..."); +reporter.start(reportingPeriod, reportingPeriodUnit); +} else { +throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); +} +} + +@Override +public void stop() { +if (reporter != null) { +LOG.debug("Stopping..."); +reporter.stop(); +} else { +throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); +} +} + + +public static TimeUnit getReportPeriodUnit(MapreporterConf) { +TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); +return unit == null ? TimeUnit.SECONDS : unit; --- End diff -- Does it say anywhere in the documentation that we'll default to seconds? If not we should add a note. It would also be good to note somewhere that the units users can use are the ones defined by TimeUnit. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155637510 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java --- @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Reporter; + +import java.util.Map; + +public interface StormReporter extends Reporter { --- End diff -- Most of the implementation start some kind of client in prepare. Shouldn't there be a close() method on this interface to allow for closing e.g. the reporters? ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155631053 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java --- @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import org.apache.storm.utils.DisruptorQueue; + +public class DisruptorMetrics { +private SimpleGauge capacity; --- End diff -- Nit: These seem like they can be final ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155644602 --- Diff: storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java --- @@ -493,6 +493,46 @@ public void validateField(String name, Object o) { } } +public static class MetricReportersValidator extends Validator { + +@Override +public void validateField(String name, Object o) { +if(o == null) { +return; +} +SimpleTypeValidator.validateField(name, Map.class, o); +if(!((Map) o).containsKey("class") ) { +throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); +} +if(!((Map) o).containsKey("daemons") ) { +throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons"); +} else { +// daemons can only be 'nimbus', 'supervisor', or 'worker' +Object list = ((Map)o).get("daemons"); +if(list == null || !(list instanceof List)){ +throw new IllegalArgumentException("Field 'daemons' must be a non-null list."); +} +List daemonList = (List)list; +for(Object string : daemonList){ +if (string instanceof String && +(((String) string).equals("nimbus") || +((String) string).equals("supervisor") || +((String) string).equals("worker"))) { +return; +} +throw new IllegalArgumentException("Field daemons must contain at least one of \"nimbus\", \"supervisor\", or \"worker\""); +} + +} +if(((Map)o).containsKey("filter")){ +Map filterMap = (Map)((Map)o).get("filter"); +SimpleTypeValidator.validateField("filter", String.class, filterMap.get("class")); +} +SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class")); --- End diff -- Same about the first parameter. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155640453 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java --- @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.filters; + +import com.codahale.metrics.MetricFilter; + +import java.util.Map; + +public interface StormMetricsFilter extends MetricFilter { + +/** + * Called after the filter is instantiated. + * @param config an arbitrary configuration map pulled from the yaml configuration. --- End diff -- Nit: I don't feel like this comment really explain what'll be in the config map. How about "A map of the properties from the 'filter' section of the reporter configuration" or something like that? ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155638180 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +protected long reportingPeriod; +protected TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { --- End diff -- Nit: Isn't this check just renaming an NPE? I'm wondering why it's needed. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155632044 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId){ +String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static Counter counter(String name, WorkerTopologyContext context, String componentId){ +String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); +return REGISTRY.counter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +String localHost = "localhost"; +try { +hostName = Utils.localHostname(); +} catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" + --- End diff -- Nit: ill -> will ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155640205 --- Diff: conf/defaults.yaml --- @@ -293,3 +293,28 @@ storm.daemon.metrics.reporter.plugins: # configuration of cluster metrics consumer storm.cluster.metrics.consumer.publish.interval.secs: 60 + + +# Metrics v2 configuration (optional) +#storm.metrics.reporters: +# # Graphite Reporter +# - class: "org.apache.storm.metrics2.reporters.GraphiteStormReporter" +#daemons: +#- "supervisor" +#- "nimbus" +#- "worker" +#report.period: 60 +#report.period.units: "SECONDS" +#graphite.host: "localhost" +#graphite.port: 2003 +# +# # Console Reporter +# - class: "org.apache.storm.metrics2.reporters.ConsoleStormReporter" +#daemons: +#- "worker" +#report.period: 10 +#report.period.units: "SECONDS" +# +#filter: --- End diff -- Nit: Move this up a line. I wasn't sure this belonged to the console reporter at first. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155643706 --- Diff: storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java --- @@ -493,6 +493,46 @@ public void validateField(String name, Object o) { } } +public static class MetricReportersValidator extends Validator { + +@Override +public void validateField(String name, Object o) { +if(o == null) { +return; +} +SimpleTypeValidator.validateField(name, Map.class, o); +if(!((Map) o).containsKey("class") ) { --- End diff -- Nit: Can we put some of these strings ('class', ''daemons' etc.) in constants somewhere? ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155643146 --- Diff: storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java --- @@ -493,6 +493,46 @@ public void validateField(String name, Object o) { } } +public static class MetricReportersValidator extends Validator { + +@Override +public void validateField(String name, Object o) { +if(o == null) { +return; +} +SimpleTypeValidator.validateField(name, Map.class, o); +if(!((Map) o).containsKey("class") ) { +throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); +} +if(!((Map) o).containsKey("daemons") ) { +throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons"); +} else { +// daemons can only be 'nimbus', 'supervisor', or 'worker' +Object list = ((Map)o).get("daemons"); +if(list == null || !(list instanceof List)){ +throw new IllegalArgumentException("Field 'daemons' must be a non-null list."); +} +List daemonList = (List)list; +for(Object string : daemonList){ +if (string instanceof String && +(((String) string).equals("nimbus") || +((String) string).equals("supervisor") || +((String) string).equals("worker"))) { +return; +} +throw new IllegalArgumentException("Field daemons must contain at least one of \"nimbus\", \"supervisor\", or \"worker\""); --- End diff -- Message is misleading. It's printed if the list contains anything except the three allowed strings. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155642731 --- Diff: storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java --- @@ -493,6 +493,46 @@ public void validateField(String name, Object o) { } } +public static class MetricReportersValidator extends Validator { + +@Override +public void validateField(String name, Object o) { +if(o == null) { +return; +} +SimpleTypeValidator.validateField(name, Map.class, o); +if(!((Map) o).containsKey("class") ) { +throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); +} +if(!((Map) o).containsKey("daemons") ) { +throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons"); +} else { +// daemons can only be 'nimbus', 'supervisor', or 'worker' +Object list = ((Map)o).get("daemons"); +if(list == null || !(list instanceof List)){ --- End diff -- Nit: Redundant null check. null instanceof List is false. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155643436 --- Diff: storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java --- @@ -493,6 +493,46 @@ public void validateField(String name, Object o) { } } +public static class MetricReportersValidator extends Validator { + +@Override +public void validateField(String name, Object o) { +if(o == null) { +return; +} +SimpleTypeValidator.validateField(name, Map.class, o); +if(!((Map) o).containsKey("class") ) { +throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); +} +if(!((Map) o).containsKey("daemons") ) { +throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons"); +} else { +// daemons can only be 'nimbus', 'supervisor', or 'worker' +Object list = ((Map)o).get("daemons"); +if(list == null || !(list instanceof List)){ +throw new IllegalArgumentException("Field 'daemons' must be a non-null list."); +} +List daemonList = (List)list; +for(Object string : daemonList){ +if (string instanceof String && +(((String) string).equals("nimbus") || +((String) string).equals("supervisor") || +((String) string).equals("worker"))) { +return; --- End diff -- Is this return intentional? I think continue makes more sense, otherwise we skip the filter validation. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155636648 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId){ +String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +String localHost = (String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME); +if(localHost != null){ +hostName = localHost; +} else { +try { +hostName = InetAddress.getLocalHost().getCanonicalHostName(); +} catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" + + " as 'localhost'."); +} +} + +LOG.info("Starting metrics reporters..."); +List > reporterList = (List >)stormConfig.get(Config.STORM_METRICS_REPORTERS); +for(Map reporterConfig : reporterList){ +// only start those requested +List daemons = (List)reporterConfig.get("daemons"); +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r141932720 --- Diff: storm-core/src/jvm/org/apache/storm/Config.java --- @@ -139,6 +139,9 @@ @isString public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate"; +@isType(type=List.class) --- End diff -- My question was mostly around can we make the config checks more specific instead of just a list of who knows what. And at a minimum have some documentation about what should be stored in this config. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r141933736 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId){ +String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +String localHost = (String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME); +if(localHost != null){ +hostName = localHost; +} else { +try { +hostName = InetAddress.getLocalHost().getCanonicalHostName(); +} catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" + + " as 'localhost'."); +} +} + +LOG.info("Starting metrics reporters..."); +List > reporterList = (List >)stormConfig.get(Config.STORM_METRICS_REPORTERS); +for(Map reporterConfig : reporterList){ +// only start those requested +List daemons = (List)reporterConfig.get("daemons"); +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r141924200 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/Metrics2Utils.java --- @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +public class Metrics2Utils { --- End diff -- The `Utils.newInstance()` method throws `RuntimeException` if the class instantiation fails. I don't think we want that in this case. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r141922507 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId){ +String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +String localHost = (String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME); +if(localHost != null){ +hostName = localHost; +} else { +try { +hostName = InetAddress.getLocalHost().getCanonicalHostName(); +} catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" + + " as 'localhost'."); +} +} + +LOG.info("Starting metrics reporters..."); +List > reporterList = (List >)stormConfig.get(Config.STORM_METRICS_REPORTERS); +if(reporterList != null && reporterList.size() > 0) { +for (Map reporterConfig : reporterList) { +// only start those requested +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r141137280 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ --- End diff -- Yes it is thread-safe. Will double-check on the multiple registrations and update as appropriate. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r141137596 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ --- End diff -- > Given that all the values are calculated from outside, isn't Gauge right on all fields? Yes. I removed the comment and left it as a Guage. Not sure why that comment is still showing up in the GitHub diff. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r141131796 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java --- @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + + +public class AnchoredWordCount { +public static class RandomSentenceSpout extends BaseRichSpout { +private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class); + +SpoutOutputCollector _collector; +Random _rand; --- End diff -- Absolutely (I'm personally not a big fan of the "_" convention). In general (outside the main branch) I try to follow the conventions that exist in the current source for consistency. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140987102 --- Diff: storm-core/src/jvm/org/apache/storm/task/TopologyContext.java --- @@ -386,4 +388,28 @@ public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucke public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) { --- End diff -- +1 Let's guide using new metrics if possible. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140971831 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ --- End diff -- nit: broken indentation, and style nit: space ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140985025 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +long reportingPeriod; --- End diff -- I would love to see reportingPeriod and reportingPeriodUnit `protected` given that these fields are used in derived classes. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140984424 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java --- @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.graphite.GraphiteReporter; +import com.codahale.metrics.graphite.GraphiteSender; +import com.codahale.metrics.graphite.GraphiteUDP; +import com.codahale.metrics.graphite.Graphite; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class GraphiteStormReporter extends ScheduledStormReporter { +private final static Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class); + +public static final String GRAPHITE_PREFIXED_WITH = "graphite.prefixed.with"; +public static final String GRAPHITE_HOST = "graphite.host"; +public static final String GRAPHITE_PORT = "graphite.port"; +public static final String GRAPHITE_TRANSPORT = "graphite.transport"; + +@Override +public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { +LOG.debug("Preparing..."); +GraphiteReporter.Builder builder = GraphiteReporter.forRegistry(metricsRegistry); + +TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf); +if (durationUnit != null) { +builder.convertDurationsTo(durationUnit); +} + +TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf); +if (rateUnit != null) { +builder.convertRatesTo(rateUnit); +} + +StormMetricsFilter filter = getMetricsFilter(reporterConf); +if(filter != null){ +builder.filter(filter); +} +String prefix = getMetricsPrefixedWith(reporterConf); +if (prefix != null) { +builder.prefixedWith(prefix); +} + +//defaults to 10 +reportingPeriod = getReportPeriod(reporterConf); + +//defaults to seconds +reportingPeriodUnit = getReportPeriodUnit(reporterConf); + +// Not exposed: +// * withClock(Clock) + +String host = getMetricsTargetHost(reporterConf); +Integer port = getMetricsTargetPort(reporterConf); +String transport = getMetricsTargetTransport(reporterConf); +GraphiteSender sender = null; +//TODO: error checking --- End diff -- Let's either address or remove the line. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140985850 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +long reportingPeriod; +TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { +LOG.debug("Starting..."); +reporter.start(reportingPeriod, reportingPeriodUnit); +} else { +throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); +} +} + +@Override +public void stop() { +if (reporter != null) { +LOG.debug("Stopping..."); +reporter.stop(); +} else { +throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); +} +} + + +static TimeUnit getReportPeriodUnit(MapreporterConf) { +TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); +return unit == null ? TimeUnit.SECONDS : unit; +} + +private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) { +String rateUnitString = Utils.getString(reporterConf.get(configName), null); +if (rateUnitString != null) { +return TimeUnit.valueOf(rateUnitString); +} +return null; +} + +static long getReportPeriod(Map reporterConf) { +return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue(); +} + +static StormMetricsFilter getMetricsFilter(Map reporterConf){ --- End diff -- Same here too. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140985795 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +long reportingPeriod; +TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { +LOG.debug("Starting..."); +reporter.start(reportingPeriod, reportingPeriodUnit); +} else { +throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); +} +} + +@Override +public void stop() { +if (reporter != null) { +LOG.debug("Stopping..."); +reporter.stop(); +} else { +throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); +} +} + + +static TimeUnit getReportPeriodUnit(MapreporterConf) { +TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); +return unit == null ? TimeUnit.SECONDS : unit; +} + +private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) { +String rateUnitString = Utils.getString(reporterConf.get(configName), null); +if (rateUnitString != null) { +return TimeUnit.valueOf(rateUnitString); +} +return null; +} + +static long getReportPeriod(Map reporterConf) { --- End diff -- Same here. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140971387 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/Metrics2Utils.java --- @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +public class Metrics2Utils { --- End diff -- Maybe this class can be generalized. Btw doesn't Storm have such a method? It looks like used pattern. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140971053 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java --- @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import org.apache.storm.utils.DisruptorQueue; + +public class DisruptorMetrics { +private SimpleGauge capacity; +private SimpleGauge population; +private SimpleGauge writePosition; +private SimpleGauge readPosition; +private SimpleGauge arrivalRate; // TODO: Change to meter --- End diff -- Given that all the values are calculated from outside, isn't Gauge right on all fields? ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140972146 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ --- End diff -- Is it thread-safe? Given that metrics name doesn't have executor/task ID, same metrics name may be accessed with multiple threads. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140973295 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId){ +String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +String localHost = (String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME); --- End diff -- We can replace it with `Utils.hostname()`. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140984053 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java --- @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ganglia.GangliaReporter; +import com.codahale.metrics.MetricRegistry; +import info.ganglia.gmetric4j.gmetric.GMetric; +import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class GangliaStormReporter extends ScheduledStormReporter { +private final static Logger LOG = LoggerFactory.getLogger(GangliaStormReporter.class); + +public static final String GANGLIA_HOST = "ganglia.host"; +public static final String GANGLIA_PORT = "ganglia.port"; +public static final String GANGLIA_PREFIXED_WITH = "ganglia.prefixed.with"; +public static final String GANGLIA_DMAX = "ganglia.dmax"; +public static final String GANGLIA_TMAX = "ganglia.tmax"; +public static final String GANGLIA_UDP_ADDRESSING_MODE = "ganglia.udp.addressing.mode"; +public static final String GANGLIA_RATE_UNIT = "ganglia.rate.unit"; +public static final String GANGLIA_DURATION_UNIT = "ganglia.duration.unit"; +public static final String GANGLIA_TTL = "ganglia.ttl"; +public static final String GANGLIA_UDP_GROUP = "ganglia.udp.group"; + +@Override +public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { +LOG.debug("Preparing..."); +GangliaReporter.Builder builder = GangliaReporter.forRegistry(metricsRegistry); + +TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf); +if (durationUnit != null) { +builder.convertDurationsTo(durationUnit); +} + +TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf); +if (rateUnit != null) { +builder.convertRatesTo(rateUnit); +} + +StormMetricsFilter filter = getMetricsFilter(reporterConf); +if(filter != null){ +builder.filter(filter); +} +String prefix = getMetricsPrefixedWith(reporterConf); +if (prefix != null) { +builder.prefixedWith(prefix); +} + +Integer dmax = getGangliaDMax(reporterConf); +if (prefix != null) { +builder.withDMax(dmax); +} + +Integer tmax = getGangliaTMax(reporterConf); +if (prefix != null) { +builder.withTMax(tmax); +} + +//defaults to 10 +reportingPeriod = getReportPeriod(reporterConf); + +//defaults to seconds +reportingPeriodUnit = getReportPeriodUnit(reporterConf); + +// Not exposed: +// * withClock(Clock) + +String group = getMetricsTargetUDPGroup(reporterConf); +Integer port = getMetricsTargetPort(reporterConf); +String udpAddressingMode = getMetricsTargetUDPAddressingMode(reporterConf); +Integer ttl = getMetricsTargetTtl(reporterConf); + +GMetric.UDPAddressingMode mode = udpAddressingMode.equalsIgnoreCase("multicast") ? +GMetric.UDPAddressingMode.MULTICAST : GMetric.UDPAddressingMode.UNICAST; + +try { +GMetric sender = new GMetric(group, port, mode, ttl); +reporter = builder.build(sender); +}catch (IOException ioe){ +//TODO --- End diff -- Let's either address or remove the line. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140984437 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java --- @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.graphite.GraphiteReporter; +import com.codahale.metrics.graphite.GraphiteSender; +import com.codahale.metrics.graphite.GraphiteUDP; +import com.codahale.metrics.graphite.Graphite; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class GraphiteStormReporter extends ScheduledStormReporter { +private final static Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class); + +public static final String GRAPHITE_PREFIXED_WITH = "graphite.prefixed.with"; +public static final String GRAPHITE_HOST = "graphite.host"; +public static final String GRAPHITE_PORT = "graphite.port"; +public static final String GRAPHITE_TRANSPORT = "graphite.transport"; + +@Override +public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { +LOG.debug("Preparing..."); +GraphiteReporter.Builder builder = GraphiteReporter.forRegistry(metricsRegistry); + +TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf); +if (durationUnit != null) { +builder.convertDurationsTo(durationUnit); +} + +TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf); +if (rateUnit != null) { +builder.convertRatesTo(rateUnit); +} + +StormMetricsFilter filter = getMetricsFilter(reporterConf); +if(filter != null){ +builder.filter(filter); +} +String prefix = getMetricsPrefixedWith(reporterConf); +if (prefix != null) { +builder.prefixedWith(prefix); +} + +//defaults to 10 +reportingPeriod = getReportPeriod(reporterConf); + +//defaults to seconds +reportingPeriodUnit = getReportPeriodUnit(reporterConf); + +// Not exposed: +// * withClock(Clock) + +String host = getMetricsTargetHost(reporterConf); +Integer port = getMetricsTargetPort(reporterConf); +String transport = getMetricsTargetTransport(reporterConf); +GraphiteSender sender = null; +//TODO: error checking +if (transport.equalsIgnoreCase("udp")) { +sender = new GraphiteUDP(host, port); +} else { +//TODO: pickled support --- End diff -- Let's either address or remove the line. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140972434 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ --- End diff -- If REGISTRY is OK with registering same metrics name multiple time (that looks like assumption of `meter()`), we can just make this same as `meter()`. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140973733 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId){ +String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +String localHost = (String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME); +if(localHost != null){ +hostName = localHost; +} else { +try { +hostName = InetAddress.getLocalHost().getCanonicalHostName(); +} catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" + + " as 'localhost'."); +} +} + +LOG.info("Starting metrics reporters..."); +List > reporterList = (List >)stormConfig.get(Config.STORM_METRICS_REPORTERS); +if(reporterList != null && reporterList.size() > 0) { +for (Map reporterConfig : reporterList) { +// only start those requested +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140982289 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId){ +String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +String localHost = (String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME); +if(localHost != null){ +hostName = localHost; +} else { +try { +hostName = InetAddress.getLocalHost().getCanonicalHostName(); +} catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" + + " as 'localhost'."); +} +} + +LOG.info("Starting metrics reporters..."); +List > reporterList = (List >)stormConfig.get(Config.STORM_METRICS_REPORTERS); +for(Map reporterConfig : reporterList){ +// only start those requested +List daemons = (List)reporterConfig.get("daemons"); +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140985766 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ +private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); +protected ScheduledReporter reporter; +long reportingPeriod; +TimeUnit reportingPeriodUnit; + +@Override +public void start() { +if (reporter != null) { +LOG.debug("Starting..."); +reporter.start(reportingPeriod, reportingPeriodUnit); +} else { +throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); +} +} + +@Override +public void stop() { +if (reporter != null) { +LOG.debug("Stopping..."); +reporter.stop(); +} else { +throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); +} +} + + +static TimeUnit getReportPeriodUnit(MapreporterConf) { --- End diff -- We may want to add `public` to support custom reporter implementations based on ScheduledStormReporter. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140967211 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java --- @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter implements StormReporter{ --- End diff -- Type param T does not seem to be used in the class. implements StormReporter - should have some type param ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140966853 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java --- @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Reporter; + +import java.util.Map; + +public interface StormReporter { --- End diff -- Where is type T used in this interface? ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r137805760 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java --- @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + + +public class AnchoredWordCount { +public static class RandomSentenceSpout extends BaseRichSpout { +private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class); + +SpoutOutputCollector _collector; +Random _rand; --- End diff -- Can we try and follow the style conventions in place in 2.x? It would make the upmerge a lot simpler. ---
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user abellina commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r132852928 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId){ +String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +String localHost = (String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME); +if(localHost != null){ +hostName = localHost; +} else { +try { +hostName = InetAddress.getLocalHost().getCanonicalHostName(); +} catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" + + " as 'localhost'."); +} +} + +LOG.info("Starting metrics reporters..."); +List > reporterList = (List >)stormConfig.get(Config.STORM_METRICS_REPORTERS); +for(Map reporterConfig : reporterList){ +// only start those requested +List daemons = (List)reporterConfig.get("daemons"); +
[GitHub] storm pull request #2203: STORM-2153: New Metrics Reporting API
Github user abellina commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r132853157 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + +private static final MetricRegistry REGISTRY = new MetricRegistry(); + +private static final List REPORTERS = new ArrayList<>(); + +private static String hostName = null; + +public static SimpleGauge gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ +SimpleGauge gauge = new SimpleGauge<>(initialValue); +String metricName = metricName(name, topologyId, componentId, port); +if(REGISTRY.getGauges().containsKey(metricName)){ +return (SimpleGauge)REGISTRY.getGauges().get(metricName); +} else { +return REGISTRY.register(metricName, gauge); +} +} + +public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ +return new DisruptorMetrics( +StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), +StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), +StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) +); +} + +public static Meter meter(String name, WorkerTopologyContext context, String componentId){ +String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); +return REGISTRY.meter(metricName); +} + +public static void start(MapstormConfig, DaemonType type){ +String localHost = (String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME); +if(localHost != null){ +hostName = localHost; +} else { +try { +hostName = InetAddress.getLocalHost().getCanonicalHostName(); +} catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" + + " as 'localhost'."); +} +} + +LOG.info("Starting metrics reporters..."); +List > reporterList = (List >)stormConfig.get(Config.STORM_METRICS_REPORTERS); +for(Map reporterConfig : reporterList){ +// only start those requested +List daemons = (List)reporterConfig.get("daemons"); +