Hello Kishore,

Please see my answers below.

Thanks,
Nick

2015-09-03 18:59 GMT-04:00 Kishore Senji <[email protected]>:

> When you say "4 executors per bolt", are you referring to the
> parallelism_hint parameter that you set in the setBolt() method call? If
> so, what that does is it creates 4 instances of the same Bolt and runs them
> in parallel via 4 executors (threads), so it still is 1 bolt instance /
> thread but a total of 4 instances running in parallel.
>

I am referring to 4 executors per bolt and 1 task for each executor to
execute the code of the bolt. So, it is not only the parallelism hint in
the setBolt(), but also the .setNumTasks(). For instance, if I want a
certain bolt to be executed by 4 threads (executors), then I do:

setBolt(new CustomBolt(...), X).setNumTasks(X) , where X is 4 in this case.


> For this reason, the latency of a given bolt does not change whether we
> are running 1 instance of 4 instances in parallel, as the same logic gets
> executed. However the throughput gets increased because now there are 4
> instances running in parallel.
>
>
You are right, I am seeing throughput increasing, but also latency
increasing.


>
> I think the way that you are measuring the latency is not accurate and is
> not a good way to measure because the tick tuples get in to the same queue
> (as the rest of the messages) for the Bolt to process and if there is a
> burst of messages in a sec, the tick tuple will be behind that and it will
> get delayed and so there is no guarantee that they execute in the Bolt in
> the perfect time intervals as specified in the config and on the same
> reason your Bolt could be processing more or less depending on the rate and
> so the latency could be more less (because the latency is not measured for
> a fixed set of messages in your code)
>

Actually, that is why I use the tick-tuples method. I want to examine what
is the trade-off of adding more (remote) executors to a topology. I want to
measure the network latency in my code and using tick tuples can reveal the
amount of time a message spends in an input message queue.


> But assuming it is all fixed rate of messages, why does the latency per
> thread increase as you increase the number of threads - My guess is that
> with 1 Bolt instance, the Spout and the Bolt could have been in the same
> worker process and with 4 instances, they could have been distributed
> (assuming you have more workers) and so the network time could be added
> here as well (as in your code you really are measuring the extra time
> between two ticks, which will include the network time of the messages and
> the processing time of the Bolt)
>

This is exactly what I believe is happening. However, if you take a look at
the graphs I attached to my original message, the numbers do not make much
sense: I expect to see more latency in the 4 executors/bolt case, but I do
not expect to see 2X the latency of 1 executor/thread. In addition, I do
not think that 3 sec latency is real when my throughput is close to 60% the
input rate (input rate is 64K tuples per second).

>
> Why don't you just refer to the Storm UI metrics. The processing latency
> and the capacity of the bolt can be obtained in Storm UI with no extra
> code.
>
> I wanted to have also the network cost when distributing a computation.
Hence, the latencies on the UI are not suitable for me. On the other hand,
capacity is a useful metric. Do you know how I can get the capacity from
the Bolt's code?


>
>
> On Thu, Sep 3, 2015 at 1:51 PM, Nick R. Katsipoulakis <
> [email protected]> wrote:
>
>> Hello all,
>>
>> I have been working on a project for some time now, and I want to share a
>> phenomenon I come across on my experiments, which I can not explain.
>>
>> My project deals with scalability on a specific stream processing
>> algorithm and I want to see how scalable my approach is when I add more
>> executors per bolt to my topology (more threads essentially). I measure
>> scalability through latency on each operator (bolt) in my topology. To
>> measure latency, I try to estimate it through the tick-tuple functionality
>> that Storm provides. My bolts are configured to receive tick tuples every
>> TICK_TUPLE_FREQ_SEC seconds, by using the following function:
>>
>> @Override
>> public Map<String, Object> getComponentConfiguration() {
>>    Config conf = new Config();
>>    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 
>> SynefoJoinBolt.TICK_TUPLE_FREQ_SEC);
>>    return conf;
>> }
>>
>>
>> So, every TICK_TUPLE_FREQ_SEC seconds, each bolt is receiving a tick
>> tuple. I recognize tick tuples in my execute() function using the following
>> method:
>>
>> private boolean isTickTuple(Tuple tuple) {
>>    String sourceComponent = tuple.getSourceComponent();
>>    String sourceStreamId = tuple.getSourceStreamId();
>>    return sourceComponent.equals(Constants.SYSTEM_COMPONENT_ID) &&
>>          sourceStreamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
>> }
>>
>> After I receive a tick-tuple, I calculate latency by subtracting the
>> timestamp of the previous time I received a tick tuple from the current
>> timestamp and also TICK_TUPLE_FREQ_SEC seconds. In other words, I do the
>> following in my execute() function:
>>
>> public void execute(Tuple tuple) {
>>       Long currentTimestamp = System.currentTimeMillis();
>>       /**
>>        * Check if it is a tick tuple
>>        */
>>       if(isTickTuple(tuple) && warmFlag) {
>>          /**
>>           * Check latency
>>           */
>>          if(latestTimestamp == -1) {
>>             latestTimestamp = currentTimestamp;
>>          }else {
>>             Long timeDifference = currentTimestamp - latestTimestamp;
>>                 String latencySpecifics = latestTimestamp + "-" + 
>> currentTimestamp + "-" + timeDifference;
>>             latestTimestamp = currentTimestamp;
>>             if(timeDifference >= (TICK_TUPLE_FREQ_SEC * 1000)) {
>>                statistics.updateWindowLatency((timeDifference - 
>> (TICK_TUPLE_FREQ_SEC * 1000)));
>>                     latencyMetric.setValue(( timeDifference - 
>> (TICK_TUPLE_FREQ_SEC * 1000) ));
>>                     latencyDetailsMetric.setValue(latencySpecifics);
>>             }
>>          }
>>          collector.ack(tuple);
>>          return;
>>       }
>>
>> //Rest of the code....
>>
>> }
>>
>> The variable latestTimestamp is initialized as -1 the first time I
>> receive a tick tuple. In order to keep track of latency, I make use of
>> Storm's metrics framework.
>>
>> The puzzling part is that the more executors I add in my topology, the
>> more I see latency increasing. In fact, latency reaches levels that do not
>> make sense. My cluster consists of 3 AWS m4.xlarge nodes for ZooKeeper and
>> Nimbus, and 4 AWS m4.xlarge Supervisor nodes (each one with 4 worker
>> slots). My input load is approximately 64K tuples per second (averaging
>> close to 32 bytes each) and I am using direct-grouping between bolts and
>> spouts(I have to use it for my algorithm - I know shuffle grouping or local
>> grouping might be better, but not for my use-case).
>>
>> I have attached two PDFs illustrating the latency values I measure
>> through time. One is depicting the setup where I use 1 executor per bolt,
>> and the other 4 executors per bolt. Each marker-line shows the latency
>> reported by each task as time progresses. On the PDF with the one executor,
>> there is only one line because only one task is executed by a single
>> executor. On the PDF with the 4 executors you can see the latency reported
>> by each task being executed by a different executor. As you can see, the
>> latency measured is enormous and frankly it does not make sense. Also, it
>> is confusing the fact that using one executor per bolt has less latency
>> than using 4 executors per bolt.
>>
>> Any ideas what is going on? Am I doing something wrong in measuring
>> latency? Am I missing something? By the way, the throughput for each bolt
>> is close to 5K tuples per second. Therefore, those numbers in latency do
>> not really make sense.
>>
>> Thank you,
>> Nick
>>
>> P.S.: The phenomenon where less executors have less latency than more
>> executors happens also in smaller input rates.
>>
>
>


-- 
Nikolaos Romanos Katsipoulakis,
University of Pittsburgh, PhD candidate

Reply via email to