If your tuples are reliable (spout emit with message id) and anchored (emit
from bolt anchored to input tuple), then you have to answer that yourself.
If not, then your output queue size is not constrained by the framework and
you may still have high latency.
On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <[email protected]> wrote:

> Nathan,
> My max spout pending is set to 1. Now is my problem with latency or with
> throughput.
>
> Thank you!
> Kashyap
> On Jul 16, 2015 5:46 PM, "Nathan Leung" <[email protected]> wrote:
>
>> If your tuples are anchored max spout pending indirectly affects how many
>> tuples are generated ;).
>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <[email protected]> wrote:
>>
>>> Thanks Nathan. One question though - Are there any best practices when
>>> tuples are getting generated in the topology and not really controllable
>>> via Max Spout Pending?
>>>
>>> Thanks
>>> Kashyap
>>>
>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <[email protected]> wrote:
>>>
>>>> Also I would argue that this is not important unless your application
>>>> is especially latency sensitive or your queue is so long that it is causing
>>>> in flight tuples to timeout.
>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <[email protected]> wrote:
>>>>
>>>>> Sorry for a brief response.. The number of tuples in flight absolutely
>>>>> affects your max latency.  You need to tune your topology max spout
>>>>> pending.  Lower value will reduce your end to end latency, but if it's too
>>>>> low it may affect throughput.  I've posted to the group about this before;
>>>>> if you do a search you may find some posts where I've discussed this in
>>>>> more detail.
>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Nathan,
>>>>>> Thanks. Have been running on a bare bones topology as suggested. I am
>>>>>> inclined to believe that the no. of messages in the topology at that 
>>>>>> point
>>>>>> in time is affecting the "latency".
>>>>>>
>>>>>> Am trying to now figure out how should the topology be structured
>>>>>> when the no. of transient tupples in the topology is very high.
>>>>>>
>>>>>> Topology is structured as follows -
>>>>>> Consumer (A java program sends a message to storm cluster) ->  A
>>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C
>>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes along
>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and 
>>>>>> confirms
>>>>>> if all the 100 messages are processed)
>>>>>>
>>>>>> What I observed is as follows -
>>>>>> 1. The time taken for an end to end processing of the message
>>>>>> (Sending the message to Storm cluster and till the time the aggregation 
>>>>>> is
>>>>>> complete) is directly dependent on the volume of messages that is 
>>>>>> entering
>>>>>> into storm and also the no. of emits done by the spout A.
>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100 tuples
>>>>>> per message (100X100=10000) there are 10000 tuples emitted and the time
>>>>>> taken to aggregate the 100 is 15 ms to 100 ms*
>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000 tuples
>>>>>> per message (100X1000=100000) **there are 100000 tuples emitted and
>>>>>> the time taken to aggregate the 100 is 4 seconds to 10 seconds*
>>>>>> 2.Strange thing is - the more parallelism i add, the times are so
>>>>>> much more bad. Am trying to figure out if the memory per worker is a
>>>>>> constraint, but this is the firs time am seeing this.
>>>>>>
>>>>>> Question - How should the use case be handled where in the no. of
>>>>>> tuples in the topology could increase exponentially..,
>>>>>>
>>>>>> Thanks
>>>>>> Kashyap
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Thank you all for the valuable info.
>>>>>>>
>>>>>>> Unfortunately, I have to use it for my (research) prototype
>>>>>>> therefore I have to go along with it.
>>>>>>>
>>>>>>> Thank you again,
>>>>>>> Nick
>>>>>>>
>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <[email protected]>:
>>>>>>>
>>>>>>>> Storm task ids don't change:
>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>>>
>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Direct grouping as it is shown in storm docs, means that you have
>>>>>>>>> to have a specific task id and use "direct streams" which is error 
>>>>>>>>> prone,
>>>>>>>>> probably increase latency and might introduce redundancy problems as 
>>>>>>>>> the
>>>>>>>>> producer of tuple needs to know the id of the task the tuple will 
>>>>>>>>> have to
>>>>>>>>> go; so imagine a scenario where the receiving task fails for some 
>>>>>>>>> reason
>>>>>>>>> and the producer can't relay the tuples unless it received the 
>>>>>>>>> re-spawned
>>>>>>>>> task's id.
>>>>>>>>>
>>>>>>>>> Hope this helps.
>>>>>>>>>
>>>>>>>>> Kindly yours,
>>>>>>>>>
>>>>>>>>> Andrew Grammenos
>>>>>>>>>
>>>>>>>>> -- PGP PKey --
>>>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>>>
>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hello again,
>>>>>>>>>>
>>>>>>>>>> Nathan, I am using direct-grouping because the application I am
>>>>>>>>>> working on has to be able to send tuples directly to specific tasks. 
>>>>>>>>>> In
>>>>>>>>>> general control the data flow. Can you please explain to me why you 
>>>>>>>>>> would
>>>>>>>>>> not recommend direct grouping? Is there any particular reason in the
>>>>>>>>>> architecture of Storm?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Nick
>>>>>>>>>>
>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <[email protected]>:
>>>>>>>>>>
>>>>>>>>>>> I would not recommend direct grouping unless you have a good
>>>>>>>>>>> reason for it.  Shuffle grouping is essentially random with even
>>>>>>>>>>> distribution which makes it easier to characterize its performance. 
>>>>>>>>>>>  Local
>>>>>>>>>>> or shuffle grouping stays in process so generally it will be faster.
>>>>>>>>>>> However you have to be careful in certain cases to avoid task 
>>>>>>>>>>> starvation
>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 
>>>>>>>>>>> spout task,
>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct grouping 
>>>>>>>>>>> depends
>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping 
>>>>>>>>>>> depends on
>>>>>>>>>>> your key distribution, etc.
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>
>>>>>>>>>>>> I have two questions:
>>>>>>>>>>>>
>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same
>>>>>>>>>>>> thing and I have a problem getting the exact milliseconds of 
>>>>>>>>>>>> latency
>>>>>>>>>>>> (mainly because of clock drifting).
>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among different
>>>>>>>>>>>> groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Nick
>>>>>>>>>>>>
>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <[email protected]>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Two things. Your math may be off depending on parallelism. One
>>>>>>>>>>>>> emit from A becomes 100 emitted from C, and you are joining all 
>>>>>>>>>>>>> of them.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Second, try the default number of ackers (one per worker). All
>>>>>>>>>>>>> your ack traffic is going to a single task.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to
>>>>>>>>>>>>> reduce network transfers.
>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> We are attempting a real-time distributed computing using
>>>>>>>>>>>>>> storm and the solution has only one problem - inter bolt
>>>>>>>>>>>>>> latency on same machine or across machines ranges between 2
>>>>>>>>>>>>>> - 250 ms. I am not able to figure out why. Network latency is
>>>>>>>>>>>>>> under 0.5 ms. By latency, I mean the time between an emit of
>>>>>>>>>>>>>> one bolt/spout to getting the message in execute() of next bolt.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives
>>>>>>>>>>>>>> this number and divides this into 10 emits of 100 each) -> C 
>>>>>>>>>>>>>> (bolt)
>>>>>>>>>>>>>> [Recieves these emits and divides this to 10 emits of 10 
>>>>>>>>>>>>>> numbers) -> D
>>>>>>>>>>>>>> (bolt) [Does some computation on the number and emits one 
>>>>>>>>>>>>>> message] -> E
>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 
>>>>>>>>>>>>>> messages are
>>>>>>>>>>>>>> processed)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result, I
>>>>>>>>>>>>>> estimated that the end to end processing for 1000 takes not more 
>>>>>>>>>>>>>> than 50
>>>>>>>>>>>>>> msec including any latencies.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *Observations*
>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec
>>>>>>>>>>>>>> to 3 seconds. My estimate was under 50 msec given that each bolt 
>>>>>>>>>>>>>> and spout
>>>>>>>>>>>>>> take under 3 msec to execute including any latencies.
>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between Emit
>>>>>>>>>>>>>> from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am not able to figure out why it takes so much time between
>>>>>>>>>>>>>> a spout/bolt to next bolt. I understand that the spout/bolt 
>>>>>>>>>>>>>> buffers the
>>>>>>>>>>>>>> data into a queue and then the subsequent bolt consumes from 
>>>>>>>>>>>>>> there.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB
>>>>>>>>>>>>>> and there are 20 workers overall.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *Test*
>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25
>>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *Config values*
>>>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Please let me know if you have encountered similar issues and
>>>>>>>>>>>>>> any steps you have taken to mitigate the time taken between 
>>>>>>>>>>>>>> spout/bolt and
>>>>>>>>>>>>>> another bolt.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>
>>>>>>
>>>>>>
>>>

Reply via email to