I kind of believe that the MainThread which picks the data from incoming
queue is taking a longish time.Did anyone face this?

Execute and Process latencies are under 3-8 ms but the overall time taken
for the message to get processed is close to a 300ms. This is where I dont
understand what is happening. The case of missing 290ms

How is the overall time taken for a message computed? Is it the process
latency at the Spout or a sum of process latencies at all the bolts?

Thanks
Kashyap

On Sun, Jul 19, 2015 at 1:30 PM, Kashyap Mhaisekar <[email protected]>
wrote:

> I changed it to debug to find out the reason behind increased times as I
> suspected buffer overflow. It was info level.
>
> Regards
> Kashyap
>  On Jul 19, 2015 1:18 PM, "Nathan Leung" <[email protected]> wrote:
>
> Are your logs on debug level? Try changing to info.
> On Jul 19, 2015 1:32 PM, "Kashyap Mhaisekar" <[email protected]> wrote:
>
>> Thanks Nathan.
>> The reason for the increased time taken between bolts could be due to -
>> 1. Buffer overflow
>> 2. GC activity.
>> 3. Low parallelism
>> 4. Latency between machines (around 0.3 ms)
>>
>> Debug logs indicate queue capacity and population of queues in limits, so
>> probably that is not the cause.
>>
>> For GC, I see GC/MarkSweepCompact and GC/Copy hovering at 500 ms. Am not
>> sure if this number is good or bad. Still figuring out...
>>
>> Parallelism does not seem to be a problem as capacity is under 0.3-0.5
>> for all bolts.
>>
>> Do you know of any other reasons based on experience?
>>
>> Thanks for the time
>>
>> Thanks
>> Kashyap
>> On Jul 19, 2015 02:29, "Nathan Leung" <[email protected]> wrote:
>>
>>> Generally, binary search combined with observation of the system
>>> (whether it meets throughput/latency targets) is a good approach.
>>> On Jul 17, 2015 6:28 PM, "Kashyap Mhaisekar" <[email protected]>
>>> wrote:
>>>
>>>> Nathan,
>>>> The bolts are extending BaseBasicBolt and also, the in the spout am
>>>> explicitly emitting a msgId hence tuples should be tagged and anchored.
>>>> What I see is this -
>>>> 1. The logic exection in the bolt takes not more than 1 ms (start of
>>>> execute() and end of execute())
>>>> 2. The time is being spent *between* the bolts
>>>> 3. The thread dumps all show LMAX disruptor at -
>>>> com.lmax.disruptor.blockingwaitstrategy.*waitfor() *where the maximum
>>>> CPU time is being spent.
>>>>
>>>> Is there a pattern with which the buffer sizes need to be tuned? :(
>>>>
>>>> Thanks
>>>> Kashyap
>>>>
>>>> On Thu, Jul 16, 2015 at 6:29 PM, Andrew Xor <
>>>> [email protected]> wrote:
>>>>
>>>>> Thanks for the clarification regarding Task ID's Nathan, I was under
>>>>> that false impression as the site docs are a bit misleading. Thanks for
>>>>> pointing that out!
>>>>>
>>>>> Regards.
>>>>>
>>>>> Kindly yours,
>>>>>
>>>>> Andrew Grammenos
>>>>>
>>>>> -- PGP PKey --
>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>
>>>>> On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> If your tuples are reliable (spout emit with message id) and anchored
>>>>>> (emit from bolt anchored to input tuple), then you have to answer that
>>>>>> yourself. If not, then your output queue size is not constrained by the
>>>>>> framework and you may still have high latency.
>>>>>> On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Nathan,
>>>>>>> My max spout pending is set to 1. Now is my problem with latency or
>>>>>>> with throughput.
>>>>>>>
>>>>>>> Thank you!
>>>>>>> Kashyap
>>>>>>> On Jul 16, 2015 5:46 PM, "Nathan Leung" <[email protected]> wrote:
>>>>>>>
>>>>>>>> If your tuples are anchored max spout pending indirectly affects
>>>>>>>> how many tuples are generated ;).
>>>>>>>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Nathan. One question though - Are there any best practices
>>>>>>>>> when tuples are getting generated in the topology and not really
>>>>>>>>> controllable via Max Spout Pending?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Kashyap
>>>>>>>>>
>>>>>>>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Also I would argue that this is not important unless your
>>>>>>>>>> application is especially latency sensitive or your queue is so long 
>>>>>>>>>> that
>>>>>>>>>> it is causing in flight tuples to timeout.
>>>>>>>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Sorry for a brief response.. The number of tuples in flight
>>>>>>>>>>> absolutely affects your max latency.  You need to tune your 
>>>>>>>>>>> topology max
>>>>>>>>>>> spout pending.  Lower value will reduce your end to end latency, 
>>>>>>>>>>> but if
>>>>>>>>>>> it's too low it may affect throughput.  I've posted to the group 
>>>>>>>>>>> about this
>>>>>>>>>>> before; if you do a search you may find some posts where I've 
>>>>>>>>>>> discussed
>>>>>>>>>>> this in more detail.
>>>>>>>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Nathan,
>>>>>>>>>>>> Thanks. Have been running on a bare bones topology as
>>>>>>>>>>>> suggested. I am inclined to believe that the no. of messages in the
>>>>>>>>>>>> topology at that point in time is affecting the "latency".
>>>>>>>>>>>>
>>>>>>>>>>>> Am trying to now figure out how should the topology be
>>>>>>>>>>>> structured when the no. of transient tupples in the topology is 
>>>>>>>>>>>> very high.
>>>>>>>>>>>>
>>>>>>>>>>>> Topology is structured as follows -
>>>>>>>>>>>> Consumer (A java program sends a message to storm cluster) ->
>>>>>>>>>>>>  A (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 
>>>>>>>>>>>> messages]) ->
>>>>>>>>>>>> C (bolt) [Passes along the message to next bolt) -> D (bolt) 
>>>>>>>>>>>> [Passes along
>>>>>>>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and 
>>>>>>>>>>>> confirms
>>>>>>>>>>>> if all the 100 messages are processed)
>>>>>>>>>>>>
>>>>>>>>>>>> What I observed is as follows -
>>>>>>>>>>>> 1. The time taken for an end to end processing of the message
>>>>>>>>>>>> (Sending the message to Storm cluster and till the time the 
>>>>>>>>>>>> aggregation is
>>>>>>>>>>>> complete) is directly dependent on the volume of messages that is 
>>>>>>>>>>>> entering
>>>>>>>>>>>> into storm and also the no. of emits done by the spout A.
>>>>>>>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100
>>>>>>>>>>>> tuples per message (100X100=10000) there are 10000 tuples emitted 
>>>>>>>>>>>> and the
>>>>>>>>>>>> time taken to aggregate the 100 is 15 ms to 100 ms*
>>>>>>>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000
>>>>>>>>>>>> tuples per message (100X1000=100000) **there are 100000 tuples
>>>>>>>>>>>> emitted and the time taken to aggregate the 100 is 4 seconds to 10 
>>>>>>>>>>>> seconds*
>>>>>>>>>>>> 2.Strange thing is - the more parallelism i add, the times are
>>>>>>>>>>>> so much more bad. Am trying to figure out if the memory per worker 
>>>>>>>>>>>> is a
>>>>>>>>>>>> constraint, but this is the firs time am seeing this.
>>>>>>>>>>>>
>>>>>>>>>>>> Question - How should the use case be handled where in the no.
>>>>>>>>>>>> of tuples in the topology could increase exponentially..,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thank you all for the valuable info.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Unfortunately, I have to use it for my (research) prototype
>>>>>>>>>>>>> therefore I have to go along with it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thank you again,
>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <[email protected]>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Storm task ids don't change:
>>>>>>>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Direct grouping as it is shown in storm docs, means that you
>>>>>>>>>>>>>>> have to have a specific task id and use "direct streams" which 
>>>>>>>>>>>>>>> is error
>>>>>>>>>>>>>>> prone, probably increase latency and might introduce redundancy 
>>>>>>>>>>>>>>> problems as
>>>>>>>>>>>>>>> the producer of tuple needs to know the id of the task the 
>>>>>>>>>>>>>>> tuple will have
>>>>>>>>>>>>>>> to go; so imagine a scenario where the receiving task fails for 
>>>>>>>>>>>>>>> some reason
>>>>>>>>>>>>>>> and the producer can't relay the tuples unless it received the 
>>>>>>>>>>>>>>> re-spawned
>>>>>>>>>>>>>>> task's id.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hope this helps.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Kindly yours,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Andrew Grammenos
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -- PGP PKey --
>>>>>>>>>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hello again,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Nathan, I am using direct-grouping because the application
>>>>>>>>>>>>>>>> I am working on has to be able to send tuples directly to 
>>>>>>>>>>>>>>>> specific tasks.
>>>>>>>>>>>>>>>> In general control the data flow. Can you please explain to me 
>>>>>>>>>>>>>>>> why you
>>>>>>>>>>>>>>>> would not recommend direct grouping? Is there any particular 
>>>>>>>>>>>>>>>> reason in the
>>>>>>>>>>>>>>>> architecture of Storm?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <[email protected]>
>>>>>>>>>>>>>>>> :
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I would not recommend direct grouping unless you have a
>>>>>>>>>>>>>>>>> good reason for it.  Shuffle grouping is essentially random 
>>>>>>>>>>>>>>>>> with even
>>>>>>>>>>>>>>>>> distribution which makes it easier to characterize its 
>>>>>>>>>>>>>>>>> performance.  Local
>>>>>>>>>>>>>>>>> or shuffle grouping stays in process so generally it will be 
>>>>>>>>>>>>>>>>> faster.
>>>>>>>>>>>>>>>>> However you have to be careful in certain cases to avoid task 
>>>>>>>>>>>>>>>>> starvation
>>>>>>>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 
>>>>>>>>>>>>>>>>> 1 spout task,
>>>>>>>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct 
>>>>>>>>>>>>>>>>> grouping depends
>>>>>>>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping 
>>>>>>>>>>>>>>>>> depends on
>>>>>>>>>>>>>>>>> your key distribution, etc.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I have two questions:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the
>>>>>>>>>>>>>>>>>> same thing and I have a problem getting the exact 
>>>>>>>>>>>>>>>>>> milliseconds of latency
>>>>>>>>>>>>>>>>>> (mainly because of clock drifting).
>>>>>>>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among
>>>>>>>>>>>>>>>>>> different groupings? For instance, is shuffle faster than 
>>>>>>>>>>>>>>>>>> direct grouping?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <
>>>>>>>>>>>>>>>>>> [email protected]>:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Two things. Your math may be off depending on
>>>>>>>>>>>>>>>>>>> parallelism. One emit from A becomes 100 emitted from C, 
>>>>>>>>>>>>>>>>>>> and you are
>>>>>>>>>>>>>>>>>>> joining all of them.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Second, try the default number of ackers (one per
>>>>>>>>>>>>>>>>>>> worker). All your ack traffic is going to a single task.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible
>>>>>>>>>>>>>>>>>>> to reduce network transfers.
>>>>>>>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>> We are attempting a real-time distributed computing
>>>>>>>>>>>>>>>>>>>> using storm and the solution has only one problem -
>>>>>>>>>>>>>>>>>>>> inter bolt latency on same machine or across machines
>>>>>>>>>>>>>>>>>>>> ranges between 2 - 250 ms. I am not able to figure out 
>>>>>>>>>>>>>>>>>>>> why. Network
>>>>>>>>>>>>>>>>>>>> latency is under 0.5 ms. By latency, I mean the time
>>>>>>>>>>>>>>>>>>>> between an emit of one bolt/spout to getting the message 
>>>>>>>>>>>>>>>>>>>> in execute() of
>>>>>>>>>>>>>>>>>>>> next bolt.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt)
>>>>>>>>>>>>>>>>>>>> [Receives this number and divides this into 10 emits of 
>>>>>>>>>>>>>>>>>>>> 100 each) -> C
>>>>>>>>>>>>>>>>>>>> (bolt) [Recieves these emits and divides this to 10 emits 
>>>>>>>>>>>>>>>>>>>> of 10 numbers) ->
>>>>>>>>>>>>>>>>>>>> D (bolt) [Does some computation on the number and emits 
>>>>>>>>>>>>>>>>>>>> one message] -> E
>>>>>>>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 
>>>>>>>>>>>>>>>>>>>> 1000 messages are
>>>>>>>>>>>>>>>>>>>> processed)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a
>>>>>>>>>>>>>>>>>>>> result, I estimated that the end to end processing for 
>>>>>>>>>>>>>>>>>>>> 1000 takes not more
>>>>>>>>>>>>>>>>>>>> than 50 msec including any latencies.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> *Observations*
>>>>>>>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200
>>>>>>>>>>>>>>>>>>>> msec to 3 seconds. My estimate was under 50 msec given 
>>>>>>>>>>>>>>>>>>>> that each bolt and
>>>>>>>>>>>>>>>>>>>> spout take under 3 msec to execute including any latencies.
>>>>>>>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between
>>>>>>>>>>>>>>>>>>>> Emit from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I am not able to figure out why it takes so much time
>>>>>>>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the 
>>>>>>>>>>>>>>>>>>>> spout/bolt buffers
>>>>>>>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt 
>>>>>>>>>>>>>>>>>>>> consumes from there.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024
>>>>>>>>>>>>>>>>>>>> MB and there are 20 workers overall.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> *Test*
>>>>>>>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout =>
>>>>>>>>>>>>>>>>>>>> 25 messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> *Config values*
>>>>>>>>>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS,
>>>>>>>>>>>>>>>>>>>> Integer.parseInt(20));
>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
>>>>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,
>>>>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Please let me know if you have encountered similar
>>>>>>>>>>>>>>>>>>>> issues and any steps you have taken to mitigate the time 
>>>>>>>>>>>>>>>>>>>> taken between
>>>>>>>>>>>>>>>>>>>> spout/bolt and another bolt.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>
>>>>

Reply via email to