[GitHub] storm pull request #2399: Track network data metrics STORM-2793

2017-11-01 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2399#discussion_r148330545
  
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java 
---
@@ -249,6 +250,15 @@ public Object getState() {
 }
 }
 ret.put("enqueued", enqueued);
+
+// Report messageSizes metric
+if (_cb instanceof IMetric) {
+Object metrics = ((IMetric) _cb).getValueAndReset();
+if(metrics instanceof Map && !((Map) metrics).isEmpty()) {
--- End diff --

Also fitting with what we had previously.  It would be nice to allow an 
empty map, but not do anything on null.  This would match what we do in other 
places working with IMetrics.


---


[GitHub] storm pull request #2399: Track network data metrics STORM-2793

2017-11-01 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2399#discussion_r148327252
  
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -451,6 +451,12 @@
 public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = 
"topology.metrics.consumer.register";
 
 /**
+ * Enable tracking of network message byte counts per 
source-destination task
--- End diff --

It would be good to add this in to defaults.yaml.  Just so it is clear what 
the default is.  It would also be nice to explain why it is disabled by 
default.  At least from the code that is what it looks like is the intention.


---


[GitHub] storm pull request #2399: Track network data metrics STORM-2793

2017-11-01 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2399#discussion_r148328987
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
 ---
@@ -17,44 +17,102 @@
  */
 package org.apache.storm.messaging;
 
+import org.apache.storm.Config;
 import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.serialization.KryoTupleDeserializer;
 import org.apache.storm.task.GeneralTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.serialization.KryoTupleDeserializer;
+import org.apache.storm.tuple.Tuple;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 
 /**
  * A class that is called when a TaskMessage arrives.
  */
-public class DeserializingConnectionCallback implements 
IConnectionCallback {
+public class DeserializingConnectionCallback implements 
IConnectionCallback, IMetric {
 private final WorkerState.ILocalTransferCallback _cb;
 private final Map _conf;
 private final GeneralTopologyContext _context;
+
 private final ThreadLocal _des =
- new ThreadLocal() {
- @Override
- protected KryoTupleDeserializer initialValue() {
- return new KryoTupleDeserializer(_conf, _context);
- }
- };
-
-public DeserializingConnectionCallback(final Map conf, 
final GeneralTopologyContext context, WorkerState.ILocalTransferCallback 
callback) {
+new ThreadLocal() {
+@Override
+protected KryoTupleDeserializer initialValue() {
+return new KryoTupleDeserializer(_conf, _context);
+}
+};
+
+// Track serialized size of messages
+private final boolean _sizeMetricsEnabled;
+private final ConcurrentHashMap _byteCounts = new 
ConcurrentHashMap<>();
+
+
+public DeserializingConnectionCallback(final Map conf, final 
GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
 _conf = conf;
 _context = context;
 _cb = callback;
+_sizeMetricsEnabled = 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
+  (Boolean) 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS);
+
 }
 
 @Override
 public void recv(List batch) {
 KryoTupleDeserializer des = _des.get();
 ArrayList ret = new ArrayList<>(batch.size());
 for (TaskMessage message: batch) {
-ret.add(new AddressedTuple(message.task(), 
des.deserialize(message.message(;
+Tuple tuple = des.deserialize(message.message());
+AddressedTuple addrTuple = new AddressedTuple(message.task(), 
tuple);
+updateMetrics(tuple.getSourceTask(), message);
+ret.add(addrTuple);
 }
 _cb.transfer(ret);
 }
 
+@Override
+public Object getValueAndReset() {
+HashMap outMap = new HashMap<>();
+
+if (_sizeMetricsEnabled) { // Possible race conditions
+for (Map.Entry ent : 
_byteCounts.entrySet()) {
+AtomicLong count = ent.getValue();
+if (count.get() > 0) {
+outMap.put(ent.getKey(), count.getAndSet(0L));
+}
+}
+}
+return outMap;
+}
+
+/**
+ * Update serialized byte counts for each message
--- End diff --

nit: please put a '.' at the end of the first sentence in the javadoc.


---


[GitHub] storm pull request #2399: Track network data metrics STORM-2793

2017-11-01 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2399#discussion_r148329501
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
 ---
@@ -17,44 +17,102 @@
  */
 package org.apache.storm.messaging;
 
+import org.apache.storm.Config;
 import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.serialization.KryoTupleDeserializer;
 import org.apache.storm.task.GeneralTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.serialization.KryoTupleDeserializer;
+import org.apache.storm.tuple.Tuple;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 
 /**
  * A class that is called when a TaskMessage arrives.
  */
-public class DeserializingConnectionCallback implements 
IConnectionCallback {
+public class DeserializingConnectionCallback implements 
IConnectionCallback, IMetric {
 private final WorkerState.ILocalTransferCallback _cb;
 private final Map _conf;
 private final GeneralTopologyContext _context;
+
 private final ThreadLocal _des =
- new ThreadLocal() {
- @Override
- protected KryoTupleDeserializer initialValue() {
- return new KryoTupleDeserializer(_conf, _context);
- }
- };
-
-public DeserializingConnectionCallback(final Map conf, 
final GeneralTopologyContext context, WorkerState.ILocalTransferCallback 
callback) {
+new ThreadLocal() {
+@Override
+protected KryoTupleDeserializer initialValue() {
+return new KryoTupleDeserializer(_conf, _context);
+}
+};
+
+// Track serialized size of messages
+private final boolean _sizeMetricsEnabled;
+private final ConcurrentHashMap _byteCounts = new 
ConcurrentHashMap<>();
+
+
+public DeserializingConnectionCallback(final Map conf, final 
GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
 _conf = conf;
 _context = context;
 _cb = callback;
+_sizeMetricsEnabled = 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
+  (Boolean) 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS);
+
 }
 
 @Override
 public void recv(List batch) {
 KryoTupleDeserializer des = _des.get();
 ArrayList ret = new ArrayList<>(batch.size());
 for (TaskMessage message: batch) {
-ret.add(new AddressedTuple(message.task(), 
des.deserialize(message.message(;
+Tuple tuple = des.deserialize(message.message());
+AddressedTuple addrTuple = new AddressedTuple(message.task(), 
tuple);
+updateMetrics(tuple.getSourceTask(), message);
+ret.add(addrTuple);
 }
 _cb.transfer(ret);
 }
 
+@Override
+public Object getValueAndReset() {
+HashMap outMap = new HashMap<>();
--- End diff --

Can we return a null if metrics are not enabled instead of an empty map?  A 
null means the metric will not be sent to the metrics collector, but an empty 
map might mean that the metrics are disabled or that there was no activity at 
all.


---


[GitHub] storm pull request #2399: Track network data metrics STORM-2793

2017-11-01 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2399#discussion_r148330111
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
 ---
@@ -17,44 +17,102 @@
  */
 package org.apache.storm.messaging;
 
+import org.apache.storm.Config;
 import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.serialization.KryoTupleDeserializer;
 import org.apache.storm.task.GeneralTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.serialization.KryoTupleDeserializer;
+import org.apache.storm.tuple.Tuple;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 
 /**
  * A class that is called when a TaskMessage arrives.
  */
-public class DeserializingConnectionCallback implements 
IConnectionCallback {
+public class DeserializingConnectionCallback implements 
IConnectionCallback, IMetric {
 private final WorkerState.ILocalTransferCallback _cb;
 private final Map _conf;
 private final GeneralTopologyContext _context;
+
 private final ThreadLocal _des =
- new ThreadLocal() {
- @Override
- protected KryoTupleDeserializer initialValue() {
- return new KryoTupleDeserializer(_conf, _context);
- }
- };
-
-public DeserializingConnectionCallback(final Map conf, 
final GeneralTopologyContext context, WorkerState.ILocalTransferCallback 
callback) {
+new ThreadLocal() {
+@Override
+protected KryoTupleDeserializer initialValue() {
+return new KryoTupleDeserializer(_conf, _context);
+}
+};
+
+// Track serialized size of messages
+private final boolean _sizeMetricsEnabled;
+private final ConcurrentHashMap _byteCounts = new 
ConcurrentHashMap<>();
+
+
+public DeserializingConnectionCallback(final Map conf, final 
GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
 _conf = conf;
 _context = context;
 _cb = callback;
+_sizeMetricsEnabled = 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
+  (Boolean) 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS);
+
 }
 
 @Override
 public void recv(List batch) {
 KryoTupleDeserializer des = _des.get();
 ArrayList ret = new ArrayList<>(batch.size());
 for (TaskMessage message: batch) {
-ret.add(new AddressedTuple(message.task(), 
des.deserialize(message.message(;
+Tuple tuple = des.deserialize(message.message());
+AddressedTuple addrTuple = new AddressedTuple(message.task(), 
tuple);
+updateMetrics(tuple.getSourceTask(), message);
+ret.add(addrTuple);
 }
 _cb.transfer(ret);
 }
 
+@Override
+public Object getValueAndReset() {
+HashMap outMap = new HashMap<>();
+
+if (_sizeMetricsEnabled) { // Possible race conditions
+for (Map.Entry ent : 
_byteCounts.entrySet()) {
+AtomicLong count = ent.getValue();
+if (count.get() > 0) {
+outMap.put(ent.getKey(), count.getAndSet(0L));
+}
+}
+}
+return outMap;
+}
+
+/**
+ * Update serialized byte counts for each message
+ * @param sourceTaskId source task
+ * @param message serialized message
+ */
+protected void updateMetrics(int sourceTaskId, TaskMessage message) {
+if (_sizeMetricsEnabled) { // Possible race conditions
+int dest = message.task();
+int len = message.message().length;
+String key = Integer.toString(sourceTaskId) + "-" + 
Integer.toString(dest);
+
+AtomicLong count = _byteCounts.get(key);
--- End diff --

In java 8 there is now a computeIfAbsent that would make this a lot simpler.

```
byteCounts.computeIfAbsent(key, (key) -> new AtomicLong(0)).addAndGet(len);
```


---


[GitHub] storm pull request #2399: Track network data metrics STORM-2793

2017-11-01 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2399#discussion_r148328743
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
 ---
@@ -17,44 +17,102 @@
  */
 package org.apache.storm.messaging;
 
+import org.apache.storm.Config;
 import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.serialization.KryoTupleDeserializer;
 import org.apache.storm.task.GeneralTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.serialization.KryoTupleDeserializer;
+import org.apache.storm.tuple.Tuple;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 
 /**
  * A class that is called when a TaskMessage arrives.
  */
-public class DeserializingConnectionCallback implements 
IConnectionCallback {
+public class DeserializingConnectionCallback implements 
IConnectionCallback, IMetric {
 private final WorkerState.ILocalTransferCallback _cb;
 private final Map _conf;
 private final GeneralTopologyContext _context;
+
 private final ThreadLocal _des =
- new ThreadLocal() {
- @Override
- protected KryoTupleDeserializer initialValue() {
- return new KryoTupleDeserializer(_conf, _context);
- }
- };
-
-public DeserializingConnectionCallback(final Map conf, 
final GeneralTopologyContext context, WorkerState.ILocalTransferCallback 
callback) {
+new ThreadLocal() {
+@Override
+protected KryoTupleDeserializer initialValue() {
+return new KryoTupleDeserializer(_conf, _context);
+}
+};
+
+// Track serialized size of messages
+private final boolean _sizeMetricsEnabled;
+private final ConcurrentHashMap _byteCounts = new 
ConcurrentHashMap<>();
+
+
+public DeserializingConnectionCallback(final Map conf, final 
GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
 _conf = conf;
 _context = context;
 _cb = callback;
+_sizeMetricsEnabled = 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
+  (Boolean) 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS);
+
 }
 
 @Override
 public void recv(List batch) {
 KryoTupleDeserializer des = _des.get();
 ArrayList ret = new ArrayList<>(batch.size());
 for (TaskMessage message: batch) {
-ret.add(new AddressedTuple(message.task(), 
des.deserialize(message.message(;
+Tuple tuple = des.deserialize(message.message());
+AddressedTuple addrTuple = new AddressedTuple(message.task(), 
tuple);
+updateMetrics(tuple.getSourceTask(), message);
+ret.add(addrTuple);
 }
 _cb.transfer(ret);
 }
 
+@Override
+public Object getValueAndReset() {
+HashMap outMap = new HashMap<>();
+
+if (_sizeMetricsEnabled) { // Possible race conditions
--- End diff --

Where is the race and what are we doing to mitigate it?  The comment is a 
bit confusing.


---


[GitHub] storm pull request #2399: Track network data metrics STORM-2793

2017-11-01 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2399#discussion_r148328916
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
 ---
@@ -17,44 +17,102 @@
  */
 package org.apache.storm.messaging;
 
+import org.apache.storm.Config;
 import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.serialization.KryoTupleDeserializer;
 import org.apache.storm.task.GeneralTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.serialization.KryoTupleDeserializer;
+import org.apache.storm.tuple.Tuple;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 
 /**
  * A class that is called when a TaskMessage arrives.
  */
-public class DeserializingConnectionCallback implements 
IConnectionCallback {
+public class DeserializingConnectionCallback implements 
IConnectionCallback, IMetric {
 private final WorkerState.ILocalTransferCallback _cb;
 private final Map _conf;
 private final GeneralTopologyContext _context;
+
 private final ThreadLocal _des =
- new ThreadLocal() {
- @Override
- protected KryoTupleDeserializer initialValue() {
- return new KryoTupleDeserializer(_conf, _context);
- }
- };
-
-public DeserializingConnectionCallback(final Map conf, 
final GeneralTopologyContext context, WorkerState.ILocalTransferCallback 
callback) {
+new ThreadLocal() {
+@Override
+protected KryoTupleDeserializer initialValue() {
+return new KryoTupleDeserializer(_conf, _context);
+}
+};
+
+// Track serialized size of messages
+private final boolean _sizeMetricsEnabled;
+private final ConcurrentHashMap _byteCounts = new 
ConcurrentHashMap<>();
+
+
+public DeserializingConnectionCallback(final Map conf, final 
GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
 _conf = conf;
 _context = context;
 _cb = callback;
+_sizeMetricsEnabled = 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
+  (Boolean) 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS);
+
 }
 
 @Override
 public void recv(List batch) {
 KryoTupleDeserializer des = _des.get();
 ArrayList ret = new ArrayList<>(batch.size());
 for (TaskMessage message: batch) {
-ret.add(new AddressedTuple(message.task(), 
des.deserialize(message.message(;
+Tuple tuple = des.deserialize(message.message());
+AddressedTuple addrTuple = new AddressedTuple(message.task(), 
tuple);
+updateMetrics(tuple.getSourceTask(), message);
+ret.add(addrTuple);
 }
 _cb.transfer(ret);
 }
 
+@Override
+public Object getValueAndReset() {
+HashMap outMap = new HashMap<>();
+
+if (_sizeMetricsEnabled) { // Possible race conditions
+for (Map.Entry ent : 
_byteCounts.entrySet()) {
+AtomicLong count = ent.getValue();
+if (count.get() > 0) {
+outMap.put(ent.getKey(), count.getAndSet(0L));
+}
+}
+}
+return outMap;
+}
+
+/**
+ * Update serialized byte counts for each message
+ * @param sourceTaskId source task
+ * @param message serialized message
+ */
+protected void updateMetrics(int sourceTaskId, TaskMessage message) {
+if (_sizeMetricsEnabled) { // Possible race conditions
--- End diff --

Here too, the comment is a bit confusing.


---


[GitHub] storm pull request #2399: Track network data metrics STORM-2793

2017-11-01 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2399#discussion_r148328186
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
 ---
@@ -17,44 +17,102 @@
  */
 package org.apache.storm.messaging;
 
+import org.apache.storm.Config;
 import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.serialization.KryoTupleDeserializer;
 import org.apache.storm.task.GeneralTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.serialization.KryoTupleDeserializer;
+import org.apache.storm.tuple.Tuple;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 
 /**
  * A class that is called when a TaskMessage arrives.
  */
-public class DeserializingConnectionCallback implements 
IConnectionCallback {
+public class DeserializingConnectionCallback implements 
IConnectionCallback, IMetric {
 private final WorkerState.ILocalTransferCallback _cb;
 private final Map _conf;
 private final GeneralTopologyContext _context;
+
 private final ThreadLocal _des =
- new ThreadLocal() {
- @Override
- protected KryoTupleDeserializer initialValue() {
- return new KryoTupleDeserializer(_conf, _context);
- }
- };
-
-public DeserializingConnectionCallback(final Map conf, 
final GeneralTopologyContext context, WorkerState.ILocalTransferCallback 
callback) {
+new ThreadLocal() {
+@Override
+protected KryoTupleDeserializer initialValue() {
+return new KryoTupleDeserializer(_conf, _context);
+}
+};
+
+// Track serialized size of messages
+private final boolean _sizeMetricsEnabled;
+private final ConcurrentHashMap _byteCounts = new 
ConcurrentHashMap<>();
+
+
+public DeserializingConnectionCallback(final Map conf, final 
GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
 _conf = conf;
 _context = context;
 _cb = callback;
+_sizeMetricsEnabled = 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
--- End diff --

It might be simpler to use.

```
sizeMetricsEnabled = 
ObjectReader.getBoolean(_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS),
 false);
```


---


[GitHub] storm pull request #2399: Track network data metrics STORM-2793

2017-11-01 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2399#discussion_r148330213
  
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java 
---
@@ -249,6 +250,15 @@ public Object getState() {
 }
 }
 ret.put("enqueued", enqueued);
+
+// Report messageSizes metric
+if (_cb instanceof IMetric) {
+Object metrics = ((IMetric) _cb).getValueAndReset();
+if(metrics instanceof Map && !((Map) metrics).isEmpty()) {
--- End diff --

nit: space after the if


---


[GitHub] storm pull request #2399: Track network data metrics STORM-2793

2017-11-01 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2399#discussion_r148327705
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
 ---
@@ -17,44 +17,102 @@
  */
 package org.apache.storm.messaging;
 
+import org.apache.storm.Config;
 import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.serialization.KryoTupleDeserializer;
 import org.apache.storm.task.GeneralTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.serialization.KryoTupleDeserializer;
+import org.apache.storm.tuple.Tuple;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 
 /**
  * A class that is called when a TaskMessage arrives.
  */
-public class DeserializingConnectionCallback implements 
IConnectionCallback {
+public class DeserializingConnectionCallback implements 
IConnectionCallback, IMetric {
 private final WorkerState.ILocalTransferCallback _cb;
 private final Map _conf;
 private final GeneralTopologyContext _context;
+
 private final ThreadLocal _des =
- new ThreadLocal() {
- @Override
- protected KryoTupleDeserializer initialValue() {
- return new KryoTupleDeserializer(_conf, _context);
- }
- };
-
-public DeserializingConnectionCallback(final Map conf, 
final GeneralTopologyContext context, WorkerState.ILocalTransferCallback 
callback) {
+new ThreadLocal() {
+@Override
+protected KryoTupleDeserializer initialValue() {
+return new KryoTupleDeserializer(_conf, _context);
+}
+};
+
+// Track serialized size of messages
+private final boolean _sizeMetricsEnabled;
+private final ConcurrentHashMap _byteCounts = new 
ConcurrentHashMap<>();
--- End diff --

I know this fits the convention we have been using for member variables in 
this file, but it would be good to follow the new style guides and remove the 
'_'.  If you want to update the other member variables in the file too that 
would be great.


---


[GitHub] storm pull request #2399: Track network data metrics STORM-2793

2017-11-01 Thread jmartell7
Github user jmartell7 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2399#discussion_r148328791
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
 ---
@@ -17,44 +17,102 @@
  */
 package org.apache.storm.messaging;
 
+import org.apache.storm.Config;
 import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.serialization.KryoTupleDeserializer;
 import org.apache.storm.task.GeneralTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.serialization.KryoTupleDeserializer;
+import org.apache.storm.tuple.Tuple;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 
 /**
  * A class that is called when a TaskMessage arrives.
  */
-public class DeserializingConnectionCallback implements 
IConnectionCallback {
+public class DeserializingConnectionCallback implements 
IConnectionCallback, IMetric {
 private final WorkerState.ILocalTransferCallback _cb;
 private final Map _conf;
 private final GeneralTopologyContext _context;
+
 private final ThreadLocal _des =
- new ThreadLocal() {
- @Override
- protected KryoTupleDeserializer initialValue() {
- return new KryoTupleDeserializer(_conf, _context);
- }
- };
-
-public DeserializingConnectionCallback(final Map conf, 
final GeneralTopologyContext context, WorkerState.ILocalTransferCallback 
callback) {
+new ThreadLocal() {
+@Override
+protected KryoTupleDeserializer initialValue() {
+return new KryoTupleDeserializer(_conf, _context);
+}
+};
+
+// Track serialized size of messages
+private final boolean _sizeMetricsEnabled;
+private final ConcurrentHashMap _byteCounts = new 
ConcurrentHashMap<>();
+
+
+public DeserializingConnectionCallback(final Map conf, final 
GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
 _conf = conf;
 _context = context;
 _cb = callback;
+_sizeMetricsEnabled = 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
+  (Boolean) 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS);
+
 }
 
 @Override
 public void recv(List batch) {
 KryoTupleDeserializer des = _des.get();
 ArrayList ret = new ArrayList<>(batch.size());
 for (TaskMessage message: batch) {
-ret.add(new AddressedTuple(message.task(), 
des.deserialize(message.message(;
+Tuple tuple = des.deserialize(message.message());
+AddressedTuple addrTuple = new AddressedTuple(message.task(), 
tuple);
+updateMetrics(tuple.getSourceTask(), message);
+ret.add(addrTuple);
 }
 _cb.transfer(ret);
 }
 
+@Override
+public Object getValueAndReset() {
+HashMap outMap = new HashMap<>();
+
+if (_sizeMetricsEnabled) { // Possible race conditions
+for (Map.Entry ent : 
_byteCounts.entrySet()) {
+AtomicLong count = ent.getValue();
+if (count.get() > 0) {
+outMap.put(ent.getKey(), count.getAndSet(0L));
+}
+}
+}
+return outMap;
+}
+
+/**
+ * Update serialized byte counts for each message
+ * @param sourceTaskId source task
+ * @param message serialized message
+ */
+protected void updateMetrics(int sourceTaskId, TaskMessage message) {
+if (_sizeMetricsEnabled) { // Possible race conditions
+int dest = message.task();
+int len = message.message().length;
+String key = Integer.toString(sourceTaskId) + "-" + 
Integer.toString(dest);
--- End diff --

Should this use the ComponentId instead?


---