[GitHub] storm pull request #2399: Track network data metrics STORM-2793
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2399#discussion_r148330545 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java --- @@ -249,6 +250,15 @@ public Object getState() { } } ret.put("enqueued", enqueued); + +// Report messageSizes metric +if (_cb instanceof IMetric) { +Object metrics = ((IMetric) _cb).getValueAndReset(); +if(metrics instanceof Map && !((Map) metrics).isEmpty()) { --- End diff -- Also fitting with what we had previously. It would be nice to allow an empty map, but not do anything on null. This would match what we do in other places working with IMetrics. ---
[GitHub] storm pull request #2399: Track network data metrics STORM-2793
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2399#discussion_r148327252 --- Diff: storm-client/src/jvm/org/apache/storm/Config.java --- @@ -451,6 +451,12 @@ public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register"; /** + * Enable tracking of network message byte counts per source-destination task --- End diff -- It would be good to add this in to defaults.yaml. Just so it is clear what the default is. It would also be nice to explain why it is disabled by default. At least from the code that is what it looks like is the intention. ---
[GitHub] storm pull request #2399: Track network data metrics STORM-2793
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2399#discussion_r148328987 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java --- @@ -17,44 +17,102 @@ */ package org.apache.storm.messaging; +import org.apache.storm.Config; import org.apache.storm.daemon.worker.WorkerState; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.serialization.KryoTupleDeserializer; import org.apache.storm.task.GeneralTopologyContext; import org.apache.storm.tuple.AddressedTuple; -import org.apache.storm.serialization.KryoTupleDeserializer; +import org.apache.storm.tuple.Tuple; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + /** * A class that is called when a TaskMessage arrives. */ -public class DeserializingConnectionCallback implements IConnectionCallback { +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric { private final WorkerState.ILocalTransferCallback _cb; private final Map _conf; private final GeneralTopologyContext _context; + private final ThreadLocal _des = - new ThreadLocal() { - @Override - protected KryoTupleDeserializer initialValue() { - return new KryoTupleDeserializer(_conf, _context); - } - }; - -public DeserializingConnectionCallback(final Mapconf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { +new ThreadLocal() { +@Override +protected KryoTupleDeserializer initialValue() { +return new KryoTupleDeserializer(_conf, _context); +} +}; + +// Track serialized size of messages +private final boolean _sizeMetricsEnabled; +private final ConcurrentHashMap _byteCounts = new ConcurrentHashMap<>(); + + +public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { _conf = conf; _context = context; _cb = callback; +_sizeMetricsEnabled = _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean && + (Boolean) _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS); + } @Override public void recv(List batch) { KryoTupleDeserializer des = _des.get(); ArrayList ret = new ArrayList<>(batch.size()); for (TaskMessage message: batch) { -ret.add(new AddressedTuple(message.task(), des.deserialize(message.message(; +Tuple tuple = des.deserialize(message.message()); +AddressedTuple addrTuple = new AddressedTuple(message.task(), tuple); +updateMetrics(tuple.getSourceTask(), message); +ret.add(addrTuple); } _cb.transfer(ret); } +@Override +public Object getValueAndReset() { +HashMap outMap = new HashMap<>(); + +if (_sizeMetricsEnabled) { // Possible race conditions +for (Map.Entry ent : _byteCounts.entrySet()) { +AtomicLong count = ent.getValue(); +if (count.get() > 0) { +outMap.put(ent.getKey(), count.getAndSet(0L)); +} +} +} +return outMap; +} + +/** + * Update serialized byte counts for each message --- End diff -- nit: please put a '.' at the end of the first sentence in the javadoc. ---
[GitHub] storm pull request #2399: Track network data metrics STORM-2793
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2399#discussion_r148329501 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java --- @@ -17,44 +17,102 @@ */ package org.apache.storm.messaging; +import org.apache.storm.Config; import org.apache.storm.daemon.worker.WorkerState; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.serialization.KryoTupleDeserializer; import org.apache.storm.task.GeneralTopologyContext; import org.apache.storm.tuple.AddressedTuple; -import org.apache.storm.serialization.KryoTupleDeserializer; +import org.apache.storm.tuple.Tuple; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + /** * A class that is called when a TaskMessage arrives. */ -public class DeserializingConnectionCallback implements IConnectionCallback { +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric { private final WorkerState.ILocalTransferCallback _cb; private final Map _conf; private final GeneralTopologyContext _context; + private final ThreadLocal _des = - new ThreadLocal() { - @Override - protected KryoTupleDeserializer initialValue() { - return new KryoTupleDeserializer(_conf, _context); - } - }; - -public DeserializingConnectionCallback(final Mapconf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { +new ThreadLocal() { +@Override +protected KryoTupleDeserializer initialValue() { +return new KryoTupleDeserializer(_conf, _context); +} +}; + +// Track serialized size of messages +private final boolean _sizeMetricsEnabled; +private final ConcurrentHashMap _byteCounts = new ConcurrentHashMap<>(); + + +public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { _conf = conf; _context = context; _cb = callback; +_sizeMetricsEnabled = _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean && + (Boolean) _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS); + } @Override public void recv(List batch) { KryoTupleDeserializer des = _des.get(); ArrayList ret = new ArrayList<>(batch.size()); for (TaskMessage message: batch) { -ret.add(new AddressedTuple(message.task(), des.deserialize(message.message(; +Tuple tuple = des.deserialize(message.message()); +AddressedTuple addrTuple = new AddressedTuple(message.task(), tuple); +updateMetrics(tuple.getSourceTask(), message); +ret.add(addrTuple); } _cb.transfer(ret); } +@Override +public Object getValueAndReset() { +HashMap outMap = new HashMap<>(); --- End diff -- Can we return a null if metrics are not enabled instead of an empty map? A null means the metric will not be sent to the metrics collector, but an empty map might mean that the metrics are disabled or that there was no activity at all. ---
[GitHub] storm pull request #2399: Track network data metrics STORM-2793
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2399#discussion_r148330111 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java --- @@ -17,44 +17,102 @@ */ package org.apache.storm.messaging; +import org.apache.storm.Config; import org.apache.storm.daemon.worker.WorkerState; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.serialization.KryoTupleDeserializer; import org.apache.storm.task.GeneralTopologyContext; import org.apache.storm.tuple.AddressedTuple; -import org.apache.storm.serialization.KryoTupleDeserializer; +import org.apache.storm.tuple.Tuple; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + /** * A class that is called when a TaskMessage arrives. */ -public class DeserializingConnectionCallback implements IConnectionCallback { +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric { private final WorkerState.ILocalTransferCallback _cb; private final Map _conf; private final GeneralTopologyContext _context; + private final ThreadLocal _des = - new ThreadLocal() { - @Override - protected KryoTupleDeserializer initialValue() { - return new KryoTupleDeserializer(_conf, _context); - } - }; - -public DeserializingConnectionCallback(final Mapconf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { +new ThreadLocal() { +@Override +protected KryoTupleDeserializer initialValue() { +return new KryoTupleDeserializer(_conf, _context); +} +}; + +// Track serialized size of messages +private final boolean _sizeMetricsEnabled; +private final ConcurrentHashMap _byteCounts = new ConcurrentHashMap<>(); + + +public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { _conf = conf; _context = context; _cb = callback; +_sizeMetricsEnabled = _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean && + (Boolean) _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS); + } @Override public void recv(List batch) { KryoTupleDeserializer des = _des.get(); ArrayList ret = new ArrayList<>(batch.size()); for (TaskMessage message: batch) { -ret.add(new AddressedTuple(message.task(), des.deserialize(message.message(; +Tuple tuple = des.deserialize(message.message()); +AddressedTuple addrTuple = new AddressedTuple(message.task(), tuple); +updateMetrics(tuple.getSourceTask(), message); +ret.add(addrTuple); } _cb.transfer(ret); } +@Override +public Object getValueAndReset() { +HashMap outMap = new HashMap<>(); + +if (_sizeMetricsEnabled) { // Possible race conditions +for (Map.Entry ent : _byteCounts.entrySet()) { +AtomicLong count = ent.getValue(); +if (count.get() > 0) { +outMap.put(ent.getKey(), count.getAndSet(0L)); +} +} +} +return outMap; +} + +/** + * Update serialized byte counts for each message + * @param sourceTaskId source task + * @param message serialized message + */ +protected void updateMetrics(int sourceTaskId, TaskMessage message) { +if (_sizeMetricsEnabled) { // Possible race conditions +int dest = message.task(); +int len = message.message().length; +String key = Integer.toString(sourceTaskId) + "-" + Integer.toString(dest); + +AtomicLong count = _byteCounts.get(key); --- End diff -- In java 8 there is now a computeIfAbsent that would make this a lot simpler. ``` byteCounts.computeIfAbsent(key, (key) -> new AtomicLong(0)).addAndGet(len); ``` ---
[GitHub] storm pull request #2399: Track network data metrics STORM-2793
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2399#discussion_r148328743 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java --- @@ -17,44 +17,102 @@ */ package org.apache.storm.messaging; +import org.apache.storm.Config; import org.apache.storm.daemon.worker.WorkerState; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.serialization.KryoTupleDeserializer; import org.apache.storm.task.GeneralTopologyContext; import org.apache.storm.tuple.AddressedTuple; -import org.apache.storm.serialization.KryoTupleDeserializer; +import org.apache.storm.tuple.Tuple; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + /** * A class that is called when a TaskMessage arrives. */ -public class DeserializingConnectionCallback implements IConnectionCallback { +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric { private final WorkerState.ILocalTransferCallback _cb; private final Map _conf; private final GeneralTopologyContext _context; + private final ThreadLocal _des = - new ThreadLocal() { - @Override - protected KryoTupleDeserializer initialValue() { - return new KryoTupleDeserializer(_conf, _context); - } - }; - -public DeserializingConnectionCallback(final Mapconf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { +new ThreadLocal() { +@Override +protected KryoTupleDeserializer initialValue() { +return new KryoTupleDeserializer(_conf, _context); +} +}; + +// Track serialized size of messages +private final boolean _sizeMetricsEnabled; +private final ConcurrentHashMap _byteCounts = new ConcurrentHashMap<>(); + + +public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { _conf = conf; _context = context; _cb = callback; +_sizeMetricsEnabled = _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean && + (Boolean) _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS); + } @Override public void recv(List batch) { KryoTupleDeserializer des = _des.get(); ArrayList ret = new ArrayList<>(batch.size()); for (TaskMessage message: batch) { -ret.add(new AddressedTuple(message.task(), des.deserialize(message.message(; +Tuple tuple = des.deserialize(message.message()); +AddressedTuple addrTuple = new AddressedTuple(message.task(), tuple); +updateMetrics(tuple.getSourceTask(), message); +ret.add(addrTuple); } _cb.transfer(ret); } +@Override +public Object getValueAndReset() { +HashMap outMap = new HashMap<>(); + +if (_sizeMetricsEnabled) { // Possible race conditions --- End diff -- Where is the race and what are we doing to mitigate it? The comment is a bit confusing. ---
[GitHub] storm pull request #2399: Track network data metrics STORM-2793
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2399#discussion_r148328916 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java --- @@ -17,44 +17,102 @@ */ package org.apache.storm.messaging; +import org.apache.storm.Config; import org.apache.storm.daemon.worker.WorkerState; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.serialization.KryoTupleDeserializer; import org.apache.storm.task.GeneralTopologyContext; import org.apache.storm.tuple.AddressedTuple; -import org.apache.storm.serialization.KryoTupleDeserializer; +import org.apache.storm.tuple.Tuple; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + /** * A class that is called when a TaskMessage arrives. */ -public class DeserializingConnectionCallback implements IConnectionCallback { +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric { private final WorkerState.ILocalTransferCallback _cb; private final Map _conf; private final GeneralTopologyContext _context; + private final ThreadLocal _des = - new ThreadLocal() { - @Override - protected KryoTupleDeserializer initialValue() { - return new KryoTupleDeserializer(_conf, _context); - } - }; - -public DeserializingConnectionCallback(final Mapconf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { +new ThreadLocal() { +@Override +protected KryoTupleDeserializer initialValue() { +return new KryoTupleDeserializer(_conf, _context); +} +}; + +// Track serialized size of messages +private final boolean _sizeMetricsEnabled; +private final ConcurrentHashMap _byteCounts = new ConcurrentHashMap<>(); + + +public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { _conf = conf; _context = context; _cb = callback; +_sizeMetricsEnabled = _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean && + (Boolean) _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS); + } @Override public void recv(List batch) { KryoTupleDeserializer des = _des.get(); ArrayList ret = new ArrayList<>(batch.size()); for (TaskMessage message: batch) { -ret.add(new AddressedTuple(message.task(), des.deserialize(message.message(; +Tuple tuple = des.deserialize(message.message()); +AddressedTuple addrTuple = new AddressedTuple(message.task(), tuple); +updateMetrics(tuple.getSourceTask(), message); +ret.add(addrTuple); } _cb.transfer(ret); } +@Override +public Object getValueAndReset() { +HashMap outMap = new HashMap<>(); + +if (_sizeMetricsEnabled) { // Possible race conditions +for (Map.Entry ent : _byteCounts.entrySet()) { +AtomicLong count = ent.getValue(); +if (count.get() > 0) { +outMap.put(ent.getKey(), count.getAndSet(0L)); +} +} +} +return outMap; +} + +/** + * Update serialized byte counts for each message + * @param sourceTaskId source task + * @param message serialized message + */ +protected void updateMetrics(int sourceTaskId, TaskMessage message) { +if (_sizeMetricsEnabled) { // Possible race conditions --- End diff -- Here too, the comment is a bit confusing. ---
[GitHub] storm pull request #2399: Track network data metrics STORM-2793
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2399#discussion_r148328186 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java --- @@ -17,44 +17,102 @@ */ package org.apache.storm.messaging; +import org.apache.storm.Config; import org.apache.storm.daemon.worker.WorkerState; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.serialization.KryoTupleDeserializer; import org.apache.storm.task.GeneralTopologyContext; import org.apache.storm.tuple.AddressedTuple; -import org.apache.storm.serialization.KryoTupleDeserializer; +import org.apache.storm.tuple.Tuple; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + /** * A class that is called when a TaskMessage arrives. */ -public class DeserializingConnectionCallback implements IConnectionCallback { +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric { private final WorkerState.ILocalTransferCallback _cb; private final Map _conf; private final GeneralTopologyContext _context; + private final ThreadLocal _des = - new ThreadLocal() { - @Override - protected KryoTupleDeserializer initialValue() { - return new KryoTupleDeserializer(_conf, _context); - } - }; - -public DeserializingConnectionCallback(final Mapconf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { +new ThreadLocal() { +@Override +protected KryoTupleDeserializer initialValue() { +return new KryoTupleDeserializer(_conf, _context); +} +}; + +// Track serialized size of messages +private final boolean _sizeMetricsEnabled; +private final ConcurrentHashMap _byteCounts = new ConcurrentHashMap<>(); + + +public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { _conf = conf; _context = context; _cb = callback; +_sizeMetricsEnabled = _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean && --- End diff -- It might be simpler to use. ``` sizeMetricsEnabled = ObjectReader.getBoolean(_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS), false); ``` ---
[GitHub] storm pull request #2399: Track network data metrics STORM-2793
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2399#discussion_r148330213 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java --- @@ -249,6 +250,15 @@ public Object getState() { } } ret.put("enqueued", enqueued); + +// Report messageSizes metric +if (_cb instanceof IMetric) { +Object metrics = ((IMetric) _cb).getValueAndReset(); +if(metrics instanceof Map && !((Map) metrics).isEmpty()) { --- End diff -- nit: space after the if ---
[GitHub] storm pull request #2399: Track network data metrics STORM-2793
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2399#discussion_r148327705 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java --- @@ -17,44 +17,102 @@ */ package org.apache.storm.messaging; +import org.apache.storm.Config; import org.apache.storm.daemon.worker.WorkerState; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.serialization.KryoTupleDeserializer; import org.apache.storm.task.GeneralTopologyContext; import org.apache.storm.tuple.AddressedTuple; -import org.apache.storm.serialization.KryoTupleDeserializer; +import org.apache.storm.tuple.Tuple; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + /** * A class that is called when a TaskMessage arrives. */ -public class DeserializingConnectionCallback implements IConnectionCallback { +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric { private final WorkerState.ILocalTransferCallback _cb; private final Map _conf; private final GeneralTopologyContext _context; + private final ThreadLocal _des = - new ThreadLocal() { - @Override - protected KryoTupleDeserializer initialValue() { - return new KryoTupleDeserializer(_conf, _context); - } - }; - -public DeserializingConnectionCallback(final Mapconf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { +new ThreadLocal() { +@Override +protected KryoTupleDeserializer initialValue() { +return new KryoTupleDeserializer(_conf, _context); +} +}; + +// Track serialized size of messages +private final boolean _sizeMetricsEnabled; +private final ConcurrentHashMap _byteCounts = new ConcurrentHashMap<>(); --- End diff -- I know this fits the convention we have been using for member variables in this file, but it would be good to follow the new style guides and remove the '_'. If you want to update the other member variables in the file too that would be great. ---
[GitHub] storm pull request #2399: Track network data metrics STORM-2793
Github user jmartell7 commented on a diff in the pull request: https://github.com/apache/storm/pull/2399#discussion_r148328791 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java --- @@ -17,44 +17,102 @@ */ package org.apache.storm.messaging; +import org.apache.storm.Config; import org.apache.storm.daemon.worker.WorkerState; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.serialization.KryoTupleDeserializer; import org.apache.storm.task.GeneralTopologyContext; import org.apache.storm.tuple.AddressedTuple; -import org.apache.storm.serialization.KryoTupleDeserializer; +import org.apache.storm.tuple.Tuple; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + /** * A class that is called when a TaskMessage arrives. */ -public class DeserializingConnectionCallback implements IConnectionCallback { +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric { private final WorkerState.ILocalTransferCallback _cb; private final Map _conf; private final GeneralTopologyContext _context; + private final ThreadLocal _des = - new ThreadLocal() { - @Override - protected KryoTupleDeserializer initialValue() { - return new KryoTupleDeserializer(_conf, _context); - } - }; - -public DeserializingConnectionCallback(final Mapconf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { +new ThreadLocal() { +@Override +protected KryoTupleDeserializer initialValue() { +return new KryoTupleDeserializer(_conf, _context); +} +}; + +// Track serialized size of messages +private final boolean _sizeMetricsEnabled; +private final ConcurrentHashMap _byteCounts = new ConcurrentHashMap<>(); + + +public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { _conf = conf; _context = context; _cb = callback; +_sizeMetricsEnabled = _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean && + (Boolean) _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS); + } @Override public void recv(List batch) { KryoTupleDeserializer des = _des.get(); ArrayList ret = new ArrayList<>(batch.size()); for (TaskMessage message: batch) { -ret.add(new AddressedTuple(message.task(), des.deserialize(message.message(; +Tuple tuple = des.deserialize(message.message()); +AddressedTuple addrTuple = new AddressedTuple(message.task(), tuple); +updateMetrics(tuple.getSourceTask(), message); +ret.add(addrTuple); } _cb.transfer(ret); } +@Override +public Object getValueAndReset() { +HashMap outMap = new HashMap<>(); + +if (_sizeMetricsEnabled) { // Possible race conditions +for (Map.Entry ent : _byteCounts.entrySet()) { +AtomicLong count = ent.getValue(); +if (count.get() > 0) { +outMap.put(ent.getKey(), count.getAndSet(0L)); +} +} +} +return outMap; +} + +/** + * Update serialized byte counts for each message + * @param sourceTaskId source task + * @param message serialized message + */ +protected void updateMetrics(int sourceTaskId, TaskMessage message) { +if (_sizeMetricsEnabled) { // Possible race conditions +int dest = message.task(); +int len = message.message().length; +String key = Integer.toString(sourceTaskId) + "-" + Integer.toString(dest); --- End diff -- Should this use the ComponentId instead? ---