Nathan,
My max spout pending is set to 1. Now is my problem with latency or with
throughput.

Thank you!
Kashyap
On Jul 16, 2015 5:46 PM, "Nathan Leung" <[email protected]> wrote:

> If your tuples are anchored max spout pending indirectly affects how many
> tuples are generated ;).
> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <[email protected]> wrote:
>
>> Thanks Nathan. One question though - Are there any best practices when
>> tuples are getting generated in the topology and not really controllable
>> via Max Spout Pending?
>>
>> Thanks
>> Kashyap
>>
>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <[email protected]> wrote:
>>
>>> Also I would argue that this is not important unless your application is
>>> especially latency sensitive or your queue is so long that it is causing in
>>> flight tuples to timeout.
>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <[email protected]> wrote:
>>>
>>>> Sorry for a brief response.. The number of tuples in flight absolutely
>>>> affects your max latency.  You need to tune your topology max spout
>>>> pending.  Lower value will reduce your end to end latency, but if it's too
>>>> low it may affect throughput.  I've posted to the group about this before;
>>>> if you do a search you may find some posts where I've discussed this in
>>>> more detail.
>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <[email protected]>
>>>> wrote:
>>>>
>>>>> Nathan,
>>>>> Thanks. Have been running on a bare bones topology as suggested. I am
>>>>> inclined to believe that the no. of messages in the topology at that point
>>>>> in time is affecting the "latency".
>>>>>
>>>>> Am trying to now figure out how should the topology be structured when
>>>>> the no. of transient tupples in the topology is very high.
>>>>>
>>>>> Topology is structured as follows -
>>>>> Consumer (A java program sends a message to storm cluster) ->  A
>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C
>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes along
>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and 
>>>>> confirms
>>>>> if all the 100 messages are processed)
>>>>>
>>>>> What I observed is as follows -
>>>>> 1. The time taken for an end to end processing of the message (Sending
>>>>> the message to Storm cluster and till the time the aggregation is 
>>>>> complete)
>>>>> is directly dependent on the volume of messages that is entering into 
>>>>> storm
>>>>> and also the no. of emits done by the spout A.
>>>>> *Test 1: 100 sequential messages to storm with B emitting 100 tuples
>>>>> per message (100X100=10000) there are 10000 tuples emitted and the time
>>>>> taken to aggregate the 100 is 15 ms to 100 ms*
>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000 tuples
>>>>> per message (100X1000=100000) **there are 100000 tuples emitted and
>>>>> the time taken to aggregate the 100 is 4 seconds to 10 seconds*
>>>>> 2.Strange thing is - the more parallelism i add, the times are so much
>>>>> more bad. Am trying to figure out if the memory per worker is a 
>>>>> constraint,
>>>>> but this is the firs time am seeing this.
>>>>>
>>>>> Question - How should the use case be handled where in the no. of
>>>>> tuples in the topology could increase exponentially..,
>>>>>
>>>>> Thanks
>>>>> Kashyap
>>>>>
>>>>>
>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Thank you all for the valuable info.
>>>>>>
>>>>>> Unfortunately, I have to use it for my (research) prototype therefore
>>>>>> I have to go along with it.
>>>>>>
>>>>>> Thank you again,
>>>>>> Nick
>>>>>>
>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <[email protected]>:
>>>>>>
>>>>>>> Storm task ids don't change:
>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>>
>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Direct grouping as it is shown in storm docs, means that you have
>>>>>>>> to have a specific task id and use "direct streams" which is error 
>>>>>>>> prone,
>>>>>>>> probably increase latency and might introduce redundancy problems as 
>>>>>>>> the
>>>>>>>> producer of tuple needs to know the id of the task the tuple will have 
>>>>>>>> to
>>>>>>>> go; so imagine a scenario where the receiving task fails for some 
>>>>>>>> reason
>>>>>>>> and the producer can't relay the tuples unless it received the 
>>>>>>>> re-spawned
>>>>>>>> task's id.
>>>>>>>>
>>>>>>>> Hope this helps.
>>>>>>>>
>>>>>>>> Kindly yours,
>>>>>>>>
>>>>>>>> Andrew Grammenos
>>>>>>>>
>>>>>>>> -- PGP PKey --
>>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>>
>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hello again,
>>>>>>>>>
>>>>>>>>> Nathan, I am using direct-grouping because the application I am
>>>>>>>>> working on has to be able to send tuples directly to specific tasks. 
>>>>>>>>> In
>>>>>>>>> general control the data flow. Can you please explain to me why you 
>>>>>>>>> would
>>>>>>>>> not recommend direct grouping? Is there any particular reason in the
>>>>>>>>> architecture of Storm?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Nick
>>>>>>>>>
>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <[email protected]>:
>>>>>>>>>
>>>>>>>>>> I would not recommend direct grouping unless you have a good
>>>>>>>>>> reason for it.  Shuffle grouping is essentially random with even
>>>>>>>>>> distribution which makes it easier to characterize its performance.  
>>>>>>>>>> Local
>>>>>>>>>> or shuffle grouping stays in process so generally it will be faster.
>>>>>>>>>> However you have to be careful in certain cases to avoid task 
>>>>>>>>>> starvation
>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 spout 
>>>>>>>>>> task,
>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct grouping 
>>>>>>>>>> depends
>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping depends 
>>>>>>>>>> on
>>>>>>>>>> your key distribution, etc.
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello all,
>>>>>>>>>>>
>>>>>>>>>>> I have two questions:
>>>>>>>>>>>
>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same thing
>>>>>>>>>>> and I have a problem getting the exact milliseconds of latency 
>>>>>>>>>>> (mainly
>>>>>>>>>>> because of clock drifting).
>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among different
>>>>>>>>>>> groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Nick
>>>>>>>>>>>
>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <[email protected]>:
>>>>>>>>>>>
>>>>>>>>>>>> Two things. Your math may be off depending on parallelism. One
>>>>>>>>>>>> emit from A becomes 100 emitted from C, and you are joining all of 
>>>>>>>>>>>> them.
>>>>>>>>>>>>
>>>>>>>>>>>> Second, try the default number of ackers (one per worker). All
>>>>>>>>>>>> your ack traffic is going to a single task.
>>>>>>>>>>>>
>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to
>>>>>>>>>>>> reduce network transfers.
>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> We are attempting a real-time distributed computing using
>>>>>>>>>>>>> storm and the solution has only one problem - inter bolt
>>>>>>>>>>>>> latency on same machine or across machines ranges between 2 -
>>>>>>>>>>>>> 250 ms. I am not able to figure out why. Network latency is
>>>>>>>>>>>>> under 0.5 ms. By latency, I mean the time between an emit of
>>>>>>>>>>>>> one bolt/spout to getting the message in execute() of next bolt.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives
>>>>>>>>>>>>> this number and divides this into 10 emits of 100 each) -> C 
>>>>>>>>>>>>> (bolt)
>>>>>>>>>>>>> [Recieves these emits and divides this to 10 emits of 10 numbers) 
>>>>>>>>>>>>> -> D
>>>>>>>>>>>>> (bolt) [Does some computation on the number and emits one 
>>>>>>>>>>>>> message] -> E
>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 
>>>>>>>>>>>>> messages are
>>>>>>>>>>>>> processed)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result, I
>>>>>>>>>>>>> estimated that the end to end processing for 1000 takes not more 
>>>>>>>>>>>>> than 50
>>>>>>>>>>>>> msec including any latencies.
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Observations*
>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec
>>>>>>>>>>>>> to 3 seconds. My estimate was under 50 msec given that each bolt 
>>>>>>>>>>>>> and spout
>>>>>>>>>>>>> take under 3 msec to execute including any latencies.
>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between Emit
>>>>>>>>>>>>> from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am not able to figure out why it takes so much time between
>>>>>>>>>>>>> a spout/bolt to next bolt. I understand that the spout/bolt 
>>>>>>>>>>>>> buffers the
>>>>>>>>>>>>> data into a queue and then the subsequent bolt consumes from 
>>>>>>>>>>>>> there.
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and
>>>>>>>>>>>>> there are 20 workers overall.
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Test*
>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25
>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Config values*
>>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>>
>>>>>>>>>>>>> Please let me know if you have encountered similar issues and
>>>>>>>>>>>>> any steps you have taken to mitigate the time taken between 
>>>>>>>>>>>>> spout/bolt and
>>>>>>>>>>>>> another bolt.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>> University of Pittsburgh, PhD candidate
>>>>>>
>>>>>
>>>>>
>>

Reply via email to