When you say "4 executors per bolt", are you referring to the parallelism_hint parameter that you set in the setBolt() method call? If so, what that does is it creates 4 instances of the same Bolt and runs them in parallel via 4 executors (threads), so it still is 1 bolt instance / thread but a total of 4 instances running in parallel. For this reason, the latency of a given bolt does not change whether we are running 1 instance of 4 instances in parallel, as the same logic gets executed. However the throughput gets increased because now there are 4 instances running in parallel.
I think the way that you are measuring the latency is not accurate and is not a good way to measure because the tick tuples get in to the same queue (as the rest of the messages) for the Bolt to process and if there is a burst of messages in a sec, the tick tuple will be behind that and it will get delayed and so there is no guarantee that they execute in the Bolt in the perfect time intervals as specified in the config and on the same reason your Bolt could be processing more or less depending on the rate and so the latency could be more less (because the latency is not measured for a fixed set of messages in your code) But assuming it is all fixed rate of messages, why does the latency per thread increase as you increase the number of threads - My guess is that with 1 Bolt instance, the Spout and the Bolt could have been in the same worker process and with 4 instances, they could have been distributed (assuming you have more workers) and so the network time could be added here as well (as in your code you really are measuring the extra time between two ticks, which will include the network time of the messages and the processing time of the Bolt) Why don't you just refer to the Storm UI metrics. The processing latency and the capacity of the bolt can be obtained in Storm UI with no extra code. On Thu, Sep 3, 2015 at 1:51 PM, Nick R. Katsipoulakis <[email protected] > wrote: > Hello all, > > I have been working on a project for some time now, and I want to share a > phenomenon I come across on my experiments, which I can not explain. > > My project deals with scalability on a specific stream processing > algorithm and I want to see how scalable my approach is when I add more > executors per bolt to my topology (more threads essentially). I measure > scalability through latency on each operator (bolt) in my topology. To > measure latency, I try to estimate it through the tick-tuple functionality > that Storm provides. My bolts are configured to receive tick tuples every > TICK_TUPLE_FREQ_SEC seconds, by using the following function: > > @Override > public Map<String, Object> getComponentConfiguration() { > Config conf = new Config(); > conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, > SynefoJoinBolt.TICK_TUPLE_FREQ_SEC); > return conf; > } > > > So, every TICK_TUPLE_FREQ_SEC seconds, each bolt is receiving a tick > tuple. I recognize tick tuples in my execute() function using the following > method: > > private boolean isTickTuple(Tuple tuple) { > String sourceComponent = tuple.getSourceComponent(); > String sourceStreamId = tuple.getSourceStreamId(); > return sourceComponent.equals(Constants.SYSTEM_COMPONENT_ID) && > sourceStreamId.equals(Constants.SYSTEM_TICK_STREAM_ID); > } > > After I receive a tick-tuple, I calculate latency by subtracting the > timestamp of the previous time I received a tick tuple from the current > timestamp and also TICK_TUPLE_FREQ_SEC seconds. In other words, I do the > following in my execute() function: > > public void execute(Tuple tuple) { > Long currentTimestamp = System.currentTimeMillis(); > /** > * Check if it is a tick tuple > */ > if(isTickTuple(tuple) && warmFlag) { > /** > * Check latency > */ > if(latestTimestamp == -1) { > latestTimestamp = currentTimestamp; > }else { > Long timeDifference = currentTimestamp - latestTimestamp; > String latencySpecifics = latestTimestamp + "-" + > currentTimestamp + "-" + timeDifference; > latestTimestamp = currentTimestamp; > if(timeDifference >= (TICK_TUPLE_FREQ_SEC * 1000)) { > statistics.updateWindowLatency((timeDifference - > (TICK_TUPLE_FREQ_SEC * 1000))); > latencyMetric.setValue(( timeDifference - > (TICK_TUPLE_FREQ_SEC * 1000) )); > latencyDetailsMetric.setValue(latencySpecifics); > } > } > collector.ack(tuple); > return; > } > > //Rest of the code.... > > } > > The variable latestTimestamp is initialized as -1 the first time I receive > a tick tuple. In order to keep track of latency, I make use of Storm's > metrics framework. > > The puzzling part is that the more executors I add in my topology, the > more I see latency increasing. In fact, latency reaches levels that do not > make sense. My cluster consists of 3 AWS m4.xlarge nodes for ZooKeeper and > Nimbus, and 4 AWS m4.xlarge Supervisor nodes (each one with 4 worker > slots). My input load is approximately 64K tuples per second (averaging > close to 32 bytes each) and I am using direct-grouping between bolts and > spouts(I have to use it for my algorithm - I know shuffle grouping or local > grouping might be better, but not for my use-case). > > I have attached two PDFs illustrating the latency values I measure through > time. One is depicting the setup where I use 1 executor per bolt, and the > other 4 executors per bolt. Each marker-line shows the latency reported by > each task as time progresses. On the PDF with the one executor, there is > only one line because only one task is executed by a single executor. On > the PDF with the 4 executors you can see the latency reported by each task > being executed by a different executor. As you can see, the latency > measured is enormous and frankly it does not make sense. Also, it is > confusing the fact that using one executor per bolt has less latency than > using 4 executors per bolt. > > Any ideas what is going on? Am I doing something wrong in measuring > latency? Am I missing something? By the way, the throughput for each bolt > is close to 5K tuples per second. Therefore, those numbers in latency do > not really make sense. > > Thank you, > Nick > > P.S.: The phenomenon where less executors have less latency than more > executors happens also in smaller input rates. >
