Hello Kishore, Please see my answers below.
Thanks, Nick 2015-09-03 18:59 GMT-04:00 Kishore Senji <[email protected]>: > When you say "4 executors per bolt", are you referring to the > parallelism_hint parameter that you set in the setBolt() method call? If > so, what that does is it creates 4 instances of the same Bolt and runs them > in parallel via 4 executors (threads), so it still is 1 bolt instance / > thread but a total of 4 instances running in parallel. > I am referring to 4 executors per bolt and 1 task for each executor to execute the code of the bolt. So, it is not only the parallelism hint in the setBolt(), but also the .setNumTasks(). For instance, if I want a certain bolt to be executed by 4 threads (executors), then I do: setBolt(new CustomBolt(...), X).setNumTasks(X) , where X is 4 in this case. > For this reason, the latency of a given bolt does not change whether we > are running 1 instance of 4 instances in parallel, as the same logic gets > executed. However the throughput gets increased because now there are 4 > instances running in parallel. > > You are right, I am seeing throughput increasing, but also latency increasing. > > I think the way that you are measuring the latency is not accurate and is > not a good way to measure because the tick tuples get in to the same queue > (as the rest of the messages) for the Bolt to process and if there is a > burst of messages in a sec, the tick tuple will be behind that and it will > get delayed and so there is no guarantee that they execute in the Bolt in > the perfect time intervals as specified in the config and on the same > reason your Bolt could be processing more or less depending on the rate and > so the latency could be more less (because the latency is not measured for > a fixed set of messages in your code) > Actually, that is why I use the tick-tuples method. I want to examine what is the trade-off of adding more (remote) executors to a topology. I want to measure the network latency in my code and using tick tuples can reveal the amount of time a message spends in an input message queue. > But assuming it is all fixed rate of messages, why does the latency per > thread increase as you increase the number of threads - My guess is that > with 1 Bolt instance, the Spout and the Bolt could have been in the same > worker process and with 4 instances, they could have been distributed > (assuming you have more workers) and so the network time could be added > here as well (as in your code you really are measuring the extra time > between two ticks, which will include the network time of the messages and > the processing time of the Bolt) > This is exactly what I believe is happening. However, if you take a look at the graphs I attached to my original message, the numbers do not make much sense: I expect to see more latency in the 4 executors/bolt case, but I do not expect to see 2X the latency of 1 executor/thread. In addition, I do not think that 3 sec latency is real when my throughput is close to 60% the input rate (input rate is 64K tuples per second). > > Why don't you just refer to the Storm UI metrics. The processing latency > and the capacity of the bolt can be obtained in Storm UI with no extra > code. > > I wanted to have also the network cost when distributing a computation. Hence, the latencies on the UI are not suitable for me. On the other hand, capacity is a useful metric. Do you know how I can get the capacity from the Bolt's code? > > > On Thu, Sep 3, 2015 at 1:51 PM, Nick R. Katsipoulakis < > [email protected]> wrote: > >> Hello all, >> >> I have been working on a project for some time now, and I want to share a >> phenomenon I come across on my experiments, which I can not explain. >> >> My project deals with scalability on a specific stream processing >> algorithm and I want to see how scalable my approach is when I add more >> executors per bolt to my topology (more threads essentially). I measure >> scalability through latency on each operator (bolt) in my topology. To >> measure latency, I try to estimate it through the tick-tuple functionality >> that Storm provides. My bolts are configured to receive tick tuples every >> TICK_TUPLE_FREQ_SEC seconds, by using the following function: >> >> @Override >> public Map<String, Object> getComponentConfiguration() { >> Config conf = new Config(); >> conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, >> SynefoJoinBolt.TICK_TUPLE_FREQ_SEC); >> return conf; >> } >> >> >> So, every TICK_TUPLE_FREQ_SEC seconds, each bolt is receiving a tick >> tuple. I recognize tick tuples in my execute() function using the following >> method: >> >> private boolean isTickTuple(Tuple tuple) { >> String sourceComponent = tuple.getSourceComponent(); >> String sourceStreamId = tuple.getSourceStreamId(); >> return sourceComponent.equals(Constants.SYSTEM_COMPONENT_ID) && >> sourceStreamId.equals(Constants.SYSTEM_TICK_STREAM_ID); >> } >> >> After I receive a tick-tuple, I calculate latency by subtracting the >> timestamp of the previous time I received a tick tuple from the current >> timestamp and also TICK_TUPLE_FREQ_SEC seconds. In other words, I do the >> following in my execute() function: >> >> public void execute(Tuple tuple) { >> Long currentTimestamp = System.currentTimeMillis(); >> /** >> * Check if it is a tick tuple >> */ >> if(isTickTuple(tuple) && warmFlag) { >> /** >> * Check latency >> */ >> if(latestTimestamp == -1) { >> latestTimestamp = currentTimestamp; >> }else { >> Long timeDifference = currentTimestamp - latestTimestamp; >> String latencySpecifics = latestTimestamp + "-" + >> currentTimestamp + "-" + timeDifference; >> latestTimestamp = currentTimestamp; >> if(timeDifference >= (TICK_TUPLE_FREQ_SEC * 1000)) { >> statistics.updateWindowLatency((timeDifference - >> (TICK_TUPLE_FREQ_SEC * 1000))); >> latencyMetric.setValue(( timeDifference - >> (TICK_TUPLE_FREQ_SEC * 1000) )); >> latencyDetailsMetric.setValue(latencySpecifics); >> } >> } >> collector.ack(tuple); >> return; >> } >> >> //Rest of the code.... >> >> } >> >> The variable latestTimestamp is initialized as -1 the first time I >> receive a tick tuple. In order to keep track of latency, I make use of >> Storm's metrics framework. >> >> The puzzling part is that the more executors I add in my topology, the >> more I see latency increasing. In fact, latency reaches levels that do not >> make sense. My cluster consists of 3 AWS m4.xlarge nodes for ZooKeeper and >> Nimbus, and 4 AWS m4.xlarge Supervisor nodes (each one with 4 worker >> slots). My input load is approximately 64K tuples per second (averaging >> close to 32 bytes each) and I am using direct-grouping between bolts and >> spouts(I have to use it for my algorithm - I know shuffle grouping or local >> grouping might be better, but not for my use-case). >> >> I have attached two PDFs illustrating the latency values I measure >> through time. One is depicting the setup where I use 1 executor per bolt, >> and the other 4 executors per bolt. Each marker-line shows the latency >> reported by each task as time progresses. On the PDF with the one executor, >> there is only one line because only one task is executed by a single >> executor. On the PDF with the 4 executors you can see the latency reported >> by each task being executed by a different executor. As you can see, the >> latency measured is enormous and frankly it does not make sense. Also, it >> is confusing the fact that using one executor per bolt has less latency >> than using 4 executors per bolt. >> >> Any ideas what is going on? Am I doing something wrong in measuring >> latency? Am I missing something? By the way, the throughput for each bolt >> is close to 5K tuples per second. Therefore, those numbers in latency do >> not really make sense. >> >> Thank you, >> Nick >> >> P.S.: The phenomenon where less executors have less latency than more >> executors happens also in smaller input rates. >> > > -- Nikolaos Romanos Katsipoulakis, University of Pittsburgh, PhD candidate
