Thanks Nathan. One question though - Are there any best practices when
tuples are getting generated in the topology and not really controllable
via Max Spout Pending?

Thanks
Kashyap

On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <[email protected]> wrote:

> Also I would argue that this is not important unless your application is
> especially latency sensitive or your queue is so long that it is causing in
> flight tuples to timeout.
> On Jul 16, 2015 6:05 PM, "Nathan Leung" <[email protected]> wrote:
>
>> Sorry for a brief response.. The number of tuples in flight absolutely
>> affects your max latency.  You need to tune your topology max spout
>> pending.  Lower value will reduce your end to end latency, but if it's too
>> low it may affect throughput.  I've posted to the group about this before;
>> if you do a search you may find some posts where I've discussed this in
>> more detail.
>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <[email protected]> wrote:
>>
>>> Nathan,
>>> Thanks. Have been running on a bare bones topology as suggested. I am
>>> inclined to believe that the no. of messages in the topology at that point
>>> in time is affecting the "latency".
>>>
>>> Am trying to now figure out how should the topology be structured when
>>> the no. of transient tupples in the topology is very high.
>>>
>>> Topology is structured as follows -
>>> Consumer (A java program sends a message to storm cluster) ->  A (Spout)
>>> ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C (bolt)
>>> [Passes along the message to next bolt) -> D (bolt) [Passes along the
>>> message to next bolt] -> E (bolt) [Aggregates all the data and confirms if
>>> all the 100 messages are processed)
>>>
>>> What I observed is as follows -
>>> 1. The time taken for an end to end processing of the message (Sending
>>> the message to Storm cluster and till the time the aggregation is complete)
>>> is directly dependent on the volume of messages that is entering into storm
>>> and also the no. of emits done by the spout A.
>>> *Test 1: 100 sequential messages to storm with B emitting 100 tuples per
>>> message (100X100=10000) there are 10000 tuples emitted and the time taken
>>> to aggregate the 100 is 15 ms to 100 ms*
>>> *Test 2: 100 sequential messages to storm with B emitting 1000 tuples
>>> per message (100X1000=100000) **there are 100000 tuples emitted and the
>>> time taken to aggregate the 100 is 4 seconds to 10 seconds*
>>> 2.Strange thing is - the more parallelism i add, the times are so much
>>> more bad. Am trying to figure out if the memory per worker is a constraint,
>>> but this is the firs time am seeing this.
>>>
>>> Question - How should the use case be handled where in the no. of tuples
>>> in the topology could increase exponentially..,
>>>
>>> Thanks
>>> Kashyap
>>>
>>>
>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>> [email protected]> wrote:
>>>
>>>> Thank you all for the valuable info.
>>>>
>>>> Unfortunately, I have to use it for my (research) prototype therefore I
>>>> have to go along with it.
>>>>
>>>> Thank you again,
>>>> Nick
>>>>
>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <[email protected]>:
>>>>
>>>>> Storm task ids don't change:
>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>
>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Direct grouping as it is shown in storm docs, means that you have to
>>>>>> have a specific task id and use "direct streams" which is error prone,
>>>>>> probably increase latency and might introduce redundancy problems as the
>>>>>> producer of tuple needs to know the id of the task the tuple will have to
>>>>>> go; so imagine a scenario where the receiving task fails for some reason
>>>>>> and the producer can't relay the tuples unless it received the re-spawned
>>>>>> task's id.
>>>>>>
>>>>>> Hope this helps.
>>>>>>
>>>>>> Kindly yours,
>>>>>>
>>>>>> Andrew Grammenos
>>>>>>
>>>>>> -- PGP PKey --
>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>
>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hello again,
>>>>>>>
>>>>>>> Nathan, I am using direct-grouping because the application I am
>>>>>>> working on has to be able to send tuples directly to specific tasks. In
>>>>>>> general control the data flow. Can you please explain to me why you 
>>>>>>> would
>>>>>>> not recommend direct grouping? Is there any particular reason in the
>>>>>>> architecture of Storm?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Nick
>>>>>>>
>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <[email protected]>:
>>>>>>>
>>>>>>>> I would not recommend direct grouping unless you have a good reason
>>>>>>>> for it.  Shuffle grouping is essentially random with even distribution
>>>>>>>> which makes it easier to characterize its performance.  Local or 
>>>>>>>> shuffle
>>>>>>>> grouping stays in process so generally it will be faster.  However you 
>>>>>>>> have
>>>>>>>> to be careful in certain cases to avoid task starvation (e.g. you have
>>>>>>>> kafka spout with 1 partition on the topic and 1 spout task, feeding 10 
>>>>>>>> bolt
>>>>>>>> "A" tasks in 10 worker processes). Direct grouping depends on your code
>>>>>>>> (i.e. you can create hotspots), fields grouping depends on your key
>>>>>>>> distribution, etc.
>>>>>>>>
>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hello all,
>>>>>>>>>
>>>>>>>>> I have two questions:
>>>>>>>>>
>>>>>>>>> 1) How do you exactly measure latency? I am doing the same thing
>>>>>>>>> and I have a problem getting the exact milliseconds of latency (mainly
>>>>>>>>> because of clock drifting).
>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among different
>>>>>>>>> groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Nick
>>>>>>>>>
>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <[email protected]>:
>>>>>>>>>
>>>>>>>>>> Two things. Your math may be off depending on parallelism. One
>>>>>>>>>> emit from A becomes 100 emitted from C, and you are joining all of 
>>>>>>>>>> them.
>>>>>>>>>>
>>>>>>>>>> Second, try the default number of ackers (one per worker). All
>>>>>>>>>> your ack traffic is going to a single task.
>>>>>>>>>>
>>>>>>>>>> Also you can try local or shuffle grouping if possible to reduce
>>>>>>>>>> network transfers.
>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>> We are attempting a real-time distributed computing using storm and
>>>>>>>>>>> the solution has only one problem - inter bolt latency on same
>>>>>>>>>>> machine or across machines ranges between 2 - 250 ms. I am not able 
>>>>>>>>>>> to
>>>>>>>>>>> figure out why. Network latency is under 0.5 ms. By latency, I
>>>>>>>>>>> mean the time between an emit of one bolt/spout to getting the 
>>>>>>>>>>> message in
>>>>>>>>>>> execute() of next bolt.
>>>>>>>>>>>
>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives this
>>>>>>>>>>> number and divides this into 10 emits of 100 each) -> C (bolt) 
>>>>>>>>>>> [Recieves
>>>>>>>>>>> these emits and divides this to 10 emits of 10 numbers) -> D (bolt) 
>>>>>>>>>>> [Does
>>>>>>>>>>> some computation on the number and emits one message] -> E (bolt)
>>>>>>>>>>> [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>>>>>> processed)
>>>>>>>>>>>
>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result, I
>>>>>>>>>>> estimated that the end to end processing for 1000 takes not more 
>>>>>>>>>>> than 50
>>>>>>>>>>> msec including any latencies.
>>>>>>>>>>>
>>>>>>>>>>> *Observations*
>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec to
>>>>>>>>>>> 3 seconds. My estimate was under 50 msec given that each bolt and 
>>>>>>>>>>> spout
>>>>>>>>>>> take under 3 msec to execute including any latencies.
>>>>>>>>>>> 2. I noticed that the most of the time is spent between Emit
>>>>>>>>>>> from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>
>>>>>>>>>>> I am not able to figure out why it takes so much time between a
>>>>>>>>>>> spout/bolt to next bolt. I understand that the spout/bolt buffers 
>>>>>>>>>>> the data
>>>>>>>>>>> into a queue and then the subsequent bolt consumes from there.
>>>>>>>>>>>
>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and
>>>>>>>>>>> there are 20 workers overall.
>>>>>>>>>>>
>>>>>>>>>>> *Test*
>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25
>>>>>>>>>>> messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>
>>>>>>>>>>> *Config values*
>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>
>>>>>>>>>>> Please let me know if you have encountered similar issues and
>>>>>>>>>>> any steps you have taken to mitigate the time taken between 
>>>>>>>>>>> spout/bolt and
>>>>>>>>>>> another bolt.
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Kashyap
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Nikolaos Romanos Katsipoulakis,
>>>> University of Pittsburgh, PhD candidate
>>>>
>>>
>>>

Reply via email to