I changed it to debug to find out the reason behind increased times as I
suspected buffer overflow. It was info level.

Regards
Kashyap
 On Jul 19, 2015 1:18 PM, "Nathan Leung" <[email protected]> wrote:

Are your logs on debug level? Try changing to info.
On Jul 19, 2015 1:32 PM, "Kashyap Mhaisekar" <[email protected]> wrote:

> Thanks Nathan.
> The reason for the increased time taken between bolts could be due to -
> 1. Buffer overflow
> 2. GC activity.
> 3. Low parallelism
> 4. Latency between machines (around 0.3 ms)
>
> Debug logs indicate queue capacity and population of queues in limits, so
> probably that is not the cause.
>
> For GC, I see GC/MarkSweepCompact and GC/Copy hovering at 500 ms. Am not
> sure if this number is good or bad. Still figuring out...
>
> Parallelism does not seem to be a problem as capacity is under 0.3-0.5 for
> all bolts.
>
> Do you know of any other reasons based on experience?
>
> Thanks for the time
>
> Thanks
> Kashyap
> On Jul 19, 2015 02:29, "Nathan Leung" <[email protected]> wrote:
>
>> Generally, binary search combined with observation of the system (whether
>> it meets throughput/latency targets) is a good approach.
>> On Jul 17, 2015 6:28 PM, "Kashyap Mhaisekar" <[email protected]> wrote:
>>
>>> Nathan,
>>> The bolts are extending BaseBasicBolt and also, the in the spout am
>>> explicitly emitting a msgId hence tuples should be tagged and anchored.
>>> What I see is this -
>>> 1. The logic exection in the bolt takes not more than 1 ms (start of
>>> execute() and end of execute())
>>> 2. The time is being spent *between* the bolts
>>> 3. The thread dumps all show LMAX disruptor at -
>>> com.lmax.disruptor.blockingwaitstrategy.*waitfor() *where the maximum
>>> CPU time is being spent.
>>>
>>> Is there a pattern with which the buffer sizes need to be tuned? :(
>>>
>>> Thanks
>>> Kashyap
>>>
>>> On Thu, Jul 16, 2015 at 6:29 PM, Andrew Xor <[email protected]
>>> > wrote:
>>>
>>>> Thanks for the clarification regarding Task ID's Nathan, I was under
>>>> that false impression as the site docs are a bit misleading. Thanks for
>>>> pointing that out!
>>>>
>>>> Regards.
>>>>
>>>> Kindly yours,
>>>>
>>>> Andrew Grammenos
>>>>
>>>> -- PGP PKey --
>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>
>>>> On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <[email protected]>
>>>> wrote:
>>>>
>>>>> If your tuples are reliable (spout emit with message id) and anchored
>>>>> (emit from bolt anchored to input tuple), then you have to answer that
>>>>> yourself. If not, then your output queue size is not constrained by the
>>>>> framework and you may still have high latency.
>>>>> On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Nathan,
>>>>>> My max spout pending is set to 1. Now is my problem with latency or
>>>>>> with throughput.
>>>>>>
>>>>>> Thank you!
>>>>>> Kashyap
>>>>>> On Jul 16, 2015 5:46 PM, "Nathan Leung" <[email protected]> wrote:
>>>>>>
>>>>>>> If your tuples are anchored max spout pending indirectly affects how
>>>>>>> many tuples are generated ;).
>>>>>>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Nathan. One question though - Are there any best practices
>>>>>>>> when tuples are getting generated in the topology and not really
>>>>>>>> controllable via Max Spout Pending?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Kashyap
>>>>>>>>
>>>>>>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Also I would argue that this is not important unless your
>>>>>>>>> application is especially latency sensitive or your queue is so long 
>>>>>>>>> that
>>>>>>>>> it is causing in flight tuples to timeout.
>>>>>>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <[email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Sorry for a brief response.. The number of tuples in flight
>>>>>>>>>> absolutely affects your max latency.  You need to tune your topology 
>>>>>>>>>> max
>>>>>>>>>> spout pending.  Lower value will reduce your end to end latency, but 
>>>>>>>>>> if
>>>>>>>>>> it's too low it may affect throughput.  I've posted to the group 
>>>>>>>>>> about this
>>>>>>>>>> before; if you do a search you may find some posts where I've 
>>>>>>>>>> discussed
>>>>>>>>>> this in more detail.
>>>>>>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Nathan,
>>>>>>>>>>> Thanks. Have been running on a bare bones topology as suggested.
>>>>>>>>>>> I am inclined to believe that the no. of messages in the topology 
>>>>>>>>>>> at that
>>>>>>>>>>> point in time is affecting the "latency".
>>>>>>>>>>>
>>>>>>>>>>> Am trying to now figure out how should the topology be
>>>>>>>>>>> structured when the no. of transient tupples in the topology is 
>>>>>>>>>>> very high.
>>>>>>>>>>>
>>>>>>>>>>> Topology is structured as follows -
>>>>>>>>>>> Consumer (A java program sends a message to storm cluster) ->  A
>>>>>>>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 
>>>>>>>>>>> messages]) -> C
>>>>>>>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes 
>>>>>>>>>>> along
>>>>>>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and 
>>>>>>>>>>> confirms
>>>>>>>>>>> if all the 100 messages are processed)
>>>>>>>>>>>
>>>>>>>>>>> What I observed is as follows -
>>>>>>>>>>> 1. The time taken for an end to end processing of the message
>>>>>>>>>>> (Sending the message to Storm cluster and till the time the 
>>>>>>>>>>> aggregation is
>>>>>>>>>>> complete) is directly dependent on the volume of messages that is 
>>>>>>>>>>> entering
>>>>>>>>>>> into storm and also the no. of emits done by the spout A.
>>>>>>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100
>>>>>>>>>>> tuples per message (100X100=10000) there are 10000 tuples emitted 
>>>>>>>>>>> and the
>>>>>>>>>>> time taken to aggregate the 100 is 15 ms to 100 ms*
>>>>>>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000
>>>>>>>>>>> tuples per message (100X1000=100000) **there are 100000 tuples
>>>>>>>>>>> emitted and the time taken to aggregate the 100 is 4 seconds to 10 
>>>>>>>>>>> seconds*
>>>>>>>>>>> 2.Strange thing is - the more parallelism i add, the times are
>>>>>>>>>>> so much more bad. Am trying to figure out if the memory per worker 
>>>>>>>>>>> is a
>>>>>>>>>>> constraint, but this is the firs time am seeing this.
>>>>>>>>>>>
>>>>>>>>>>> Question - How should the use case be handled where in the no.
>>>>>>>>>>> of tuples in the topology could increase exponentially..,
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Kashyap
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thank you all for the valuable info.
>>>>>>>>>>>>
>>>>>>>>>>>> Unfortunately, I have to use it for my (research) prototype
>>>>>>>>>>>> therefore I have to go along with it.
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you again,
>>>>>>>>>>>> Nick
>>>>>>>>>>>>
>>>>>>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <[email protected]>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Storm task ids don't change:
>>>>>>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Direct grouping as it is shown in storm docs, means that you
>>>>>>>>>>>>>> have to have a specific task id and use "direct streams" which 
>>>>>>>>>>>>>> is error
>>>>>>>>>>>>>> prone, probably increase latency and might introduce redundancy 
>>>>>>>>>>>>>> problems as
>>>>>>>>>>>>>> the producer of tuple needs to know the id of the task the tuple 
>>>>>>>>>>>>>> will have
>>>>>>>>>>>>>> to go; so imagine a scenario where the receiving task fails for 
>>>>>>>>>>>>>> some reason
>>>>>>>>>>>>>> and the producer can't relay the tuples unless it received the 
>>>>>>>>>>>>>> re-spawned
>>>>>>>>>>>>>> task's id.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hope this helps.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Kindly yours,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Andrew Grammenos
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -- PGP PKey --
>>>>>>>>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello again,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Nathan, I am using direct-grouping because the application I
>>>>>>>>>>>>>>> am working on has to be able to send tuples directly to 
>>>>>>>>>>>>>>> specific tasks. In
>>>>>>>>>>>>>>> general control the data flow. Can you please explain to me why 
>>>>>>>>>>>>>>> you would
>>>>>>>>>>>>>>> not recommend direct grouping? Is there any particular reason 
>>>>>>>>>>>>>>> in the
>>>>>>>>>>>>>>> architecture of Storm?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <[email protected]>:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I would not recommend direct grouping unless you have a
>>>>>>>>>>>>>>>> good reason for it.  Shuffle grouping is essentially random 
>>>>>>>>>>>>>>>> with even
>>>>>>>>>>>>>>>> distribution which makes it easier to characterize its 
>>>>>>>>>>>>>>>> performance.  Local
>>>>>>>>>>>>>>>> or shuffle grouping stays in process so generally it will be 
>>>>>>>>>>>>>>>> faster.
>>>>>>>>>>>>>>>> However you have to be careful in certain cases to avoid task 
>>>>>>>>>>>>>>>> starvation
>>>>>>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 
>>>>>>>>>>>>>>>> spout task,
>>>>>>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct 
>>>>>>>>>>>>>>>> grouping depends
>>>>>>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping 
>>>>>>>>>>>>>>>> depends on
>>>>>>>>>>>>>>>> your key distribution, etc.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have two questions:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same
>>>>>>>>>>>>>>>>> thing and I have a problem getting the exact milliseconds of 
>>>>>>>>>>>>>>>>> latency
>>>>>>>>>>>>>>>>> (mainly because of clock drifting).
>>>>>>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among
>>>>>>>>>>>>>>>>> different groupings? For instance, is shuffle faster than 
>>>>>>>>>>>>>>>>> direct grouping?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <[email protected]
>>>>>>>>>>>>>>>>> >:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Two things. Your math may be off depending on
>>>>>>>>>>>>>>>>>> parallelism. One emit from A becomes 100 emitted from C, and 
>>>>>>>>>>>>>>>>>> you are
>>>>>>>>>>>>>>>>>> joining all of them.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Second, try the default number of ackers (one per
>>>>>>>>>>>>>>>>>> worker). All your ack traffic is going to a single task.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to
>>>>>>>>>>>>>>>>>> reduce network transfers.
>>>>>>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>> We are attempting a real-time distributed computing
>>>>>>>>>>>>>>>>>>> using storm and the solution has only one problem -
>>>>>>>>>>>>>>>>>>> inter bolt latency on same machine or across machines
>>>>>>>>>>>>>>>>>>> ranges between 2 - 250 ms. I am not able to figure out why. 
>>>>>>>>>>>>>>>>>>> Network
>>>>>>>>>>>>>>>>>>> latency is under 0.5 ms. By latency, I mean the time
>>>>>>>>>>>>>>>>>>> between an emit of one bolt/spout to getting the message in 
>>>>>>>>>>>>>>>>>>> execute() of
>>>>>>>>>>>>>>>>>>> next bolt.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt)
>>>>>>>>>>>>>>>>>>> [Receives this number and divides this into 10 emits of 100 
>>>>>>>>>>>>>>>>>>> each) -> C
>>>>>>>>>>>>>>>>>>> (bolt) [Recieves these emits and divides this to 10 emits 
>>>>>>>>>>>>>>>>>>> of 10 numbers) ->
>>>>>>>>>>>>>>>>>>> D (bolt) [Does some computation on the number and emits one 
>>>>>>>>>>>>>>>>>>> message] -> E
>>>>>>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 
>>>>>>>>>>>>>>>>>>> 1000 messages are
>>>>>>>>>>>>>>>>>>> processed)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a
>>>>>>>>>>>>>>>>>>> result, I estimated that the end to end processing for 1000 
>>>>>>>>>>>>>>>>>>> takes not more
>>>>>>>>>>>>>>>>>>> than 50 msec including any latencies.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> *Observations*
>>>>>>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200
>>>>>>>>>>>>>>>>>>> msec to 3 seconds. My estimate was under 50 msec given that 
>>>>>>>>>>>>>>>>>>> each bolt and
>>>>>>>>>>>>>>>>>>> spout take under 3 msec to execute including any latencies.
>>>>>>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between
>>>>>>>>>>>>>>>>>>> Emit from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I am not able to figure out why it takes so much time
>>>>>>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the 
>>>>>>>>>>>>>>>>>>> spout/bolt buffers
>>>>>>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt consumes 
>>>>>>>>>>>>>>>>>>> from there.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024
>>>>>>>>>>>>>>>>>>> MB and there are 20 workers overall.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> *Test*
>>>>>>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25
>>>>>>>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> *Config values*
>>>>>>>>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS,
>>>>>>>>>>>>>>>>>>> Integer.parseInt(20));
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
>>>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,
>>>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Please let me know if you have encountered similar
>>>>>>>>>>>>>>>>>>> issues and any steps you have taken to mitigate the time 
>>>>>>>>>>>>>>>>>>> taken between
>>>>>>>>>>>>>>>>>>> spout/bolt and another bolt.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>
>>>

Reply via email to