Nathan, My max spout pending is set to 1. Now is my problem with latency or with throughput.
Thank you! Kashyap On Jul 16, 2015 5:46 PM, "Nathan Leung" <[email protected]> wrote: > If your tuples are anchored max spout pending indirectly affects how many > tuples are generated ;). > On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <[email protected]> wrote: > >> Thanks Nathan. One question though - Are there any best practices when >> tuples are getting generated in the topology and not really controllable >> via Max Spout Pending? >> >> Thanks >> Kashyap >> >> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <[email protected]> wrote: >> >>> Also I would argue that this is not important unless your application is >>> especially latency sensitive or your queue is so long that it is causing in >>> flight tuples to timeout. >>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <[email protected]> wrote: >>> >>>> Sorry for a brief response.. The number of tuples in flight absolutely >>>> affects your max latency. You need to tune your topology max spout >>>> pending. Lower value will reduce your end to end latency, but if it's too >>>> low it may affect throughput. I've posted to the group about this before; >>>> if you do a search you may find some posts where I've discussed this in >>>> more detail. >>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <[email protected]> >>>> wrote: >>>> >>>>> Nathan, >>>>> Thanks. Have been running on a bare bones topology as suggested. I am >>>>> inclined to believe that the no. of messages in the topology at that point >>>>> in time is affecting the "latency". >>>>> >>>>> Am trying to now figure out how should the topology be structured when >>>>> the no. of transient tupples in the topology is very high. >>>>> >>>>> Topology is structured as follows - >>>>> Consumer (A java program sends a message to storm cluster) -> A >>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C >>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes along >>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and >>>>> confirms >>>>> if all the 100 messages are processed) >>>>> >>>>> What I observed is as follows - >>>>> 1. The time taken for an end to end processing of the message (Sending >>>>> the message to Storm cluster and till the time the aggregation is >>>>> complete) >>>>> is directly dependent on the volume of messages that is entering into >>>>> storm >>>>> and also the no. of emits done by the spout A. >>>>> *Test 1: 100 sequential messages to storm with B emitting 100 tuples >>>>> per message (100X100=10000) there are 10000 tuples emitted and the time >>>>> taken to aggregate the 100 is 15 ms to 100 ms* >>>>> *Test 2: 100 sequential messages to storm with B emitting 1000 tuples >>>>> per message (100X1000=100000) **there are 100000 tuples emitted and >>>>> the time taken to aggregate the 100 is 4 seconds to 10 seconds* >>>>> 2.Strange thing is - the more parallelism i add, the times are so much >>>>> more bad. Am trying to figure out if the memory per worker is a >>>>> constraint, >>>>> but this is the firs time am seeing this. >>>>> >>>>> Question - How should the use case be handled where in the no. of >>>>> tuples in the topology could increase exponentially.., >>>>> >>>>> Thanks >>>>> Kashyap >>>>> >>>>> >>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis < >>>>> [email protected]> wrote: >>>>> >>>>>> Thank you all for the valuable info. >>>>>> >>>>>> Unfortunately, I have to use it for my (research) prototype therefore >>>>>> I have to go along with it. >>>>>> >>>>>> Thank you again, >>>>>> Nick >>>>>> >>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <[email protected]>: >>>>>> >>>>>>> Storm task ids don't change: >>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c >>>>>>> >>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Direct grouping as it is shown in storm docs, means that you have >>>>>>>> to have a specific task id and use "direct streams" which is error >>>>>>>> prone, >>>>>>>> probably increase latency and might introduce redundancy problems as >>>>>>>> the >>>>>>>> producer of tuple needs to know the id of the task the tuple will have >>>>>>>> to >>>>>>>> go; so imagine a scenario where the receiving task fails for some >>>>>>>> reason >>>>>>>> and the producer can't relay the tuples unless it received the >>>>>>>> re-spawned >>>>>>>> task's id. >>>>>>>> >>>>>>>> Hope this helps. >>>>>>>> >>>>>>>> Kindly yours, >>>>>>>> >>>>>>>> Andrew Grammenos >>>>>>>> >>>>>>>> -- PGP PKey -- >>>>>>>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> >>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt >>>>>>>> >>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hello again, >>>>>>>>> >>>>>>>>> Nathan, I am using direct-grouping because the application I am >>>>>>>>> working on has to be able to send tuples directly to specific tasks. >>>>>>>>> In >>>>>>>>> general control the data flow. Can you please explain to me why you >>>>>>>>> would >>>>>>>>> not recommend direct grouping? Is there any particular reason in the >>>>>>>>> architecture of Storm? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Nick >>>>>>>>> >>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <[email protected]>: >>>>>>>>> >>>>>>>>>> I would not recommend direct grouping unless you have a good >>>>>>>>>> reason for it. Shuffle grouping is essentially random with even >>>>>>>>>> distribution which makes it easier to characterize its performance. >>>>>>>>>> Local >>>>>>>>>> or shuffle grouping stays in process so generally it will be faster. >>>>>>>>>> However you have to be careful in certain cases to avoid task >>>>>>>>>> starvation >>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 spout >>>>>>>>>> task, >>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct grouping >>>>>>>>>> depends >>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping depends >>>>>>>>>> on >>>>>>>>>> your key distribution, etc. >>>>>>>>>> >>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hello all, >>>>>>>>>>> >>>>>>>>>>> I have two questions: >>>>>>>>>>> >>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same thing >>>>>>>>>>> and I have a problem getting the exact milliseconds of latency >>>>>>>>>>> (mainly >>>>>>>>>>> because of clock drifting). >>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among different >>>>>>>>>>> groupings? For instance, is shuffle faster than direct grouping? >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Nick >>>>>>>>>>> >>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <[email protected]>: >>>>>>>>>>> >>>>>>>>>>>> Two things. Your math may be off depending on parallelism. One >>>>>>>>>>>> emit from A becomes 100 emitted from C, and you are joining all of >>>>>>>>>>>> them. >>>>>>>>>>>> >>>>>>>>>>>> Second, try the default number of ackers (one per worker). All >>>>>>>>>>>> your ack traffic is going to a single task. >>>>>>>>>>>> >>>>>>>>>>>> Also you can try local or shuffle grouping if possible to >>>>>>>>>>>> reduce network transfers. >>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> We are attempting a real-time distributed computing using >>>>>>>>>>>>> storm and the solution has only one problem - inter bolt >>>>>>>>>>>>> latency on same machine or across machines ranges between 2 - >>>>>>>>>>>>> 250 ms. I am not able to figure out why. Network latency is >>>>>>>>>>>>> under 0.5 ms. By latency, I mean the time between an emit of >>>>>>>>>>>>> one bolt/spout to getting the message in execute() of next bolt. >>>>>>>>>>>>> >>>>>>>>>>>>> I have a topology like the below - >>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives >>>>>>>>>>>>> this number and divides this into 10 emits of 100 each) -> C >>>>>>>>>>>>> (bolt) >>>>>>>>>>>>> [Recieves these emits and divides this to 10 emits of 10 numbers) >>>>>>>>>>>>> -> D >>>>>>>>>>>>> (bolt) [Does some computation on the number and emits one >>>>>>>>>>>>> message] -> E >>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 >>>>>>>>>>>>> messages are >>>>>>>>>>>>> processed) >>>>>>>>>>>>> >>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result, I >>>>>>>>>>>>> estimated that the end to end processing for 1000 takes not more >>>>>>>>>>>>> than 50 >>>>>>>>>>>>> msec including any latencies. >>>>>>>>>>>>> >>>>>>>>>>>>> *Observations* >>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec >>>>>>>>>>>>> to 3 seconds. My estimate was under 50 msec given that each bolt >>>>>>>>>>>>> and spout >>>>>>>>>>>>> take under 3 msec to execute including any latencies. >>>>>>>>>>>>> 2. I noticed that the most of the time is spent between Emit >>>>>>>>>>>>> from a Spout/Bolt and execute() of the consuming bolt. >>>>>>>>>>>>> 3. Network latency is under 0.5 msec. >>>>>>>>>>>>> >>>>>>>>>>>>> I am not able to figure out why it takes so much time between >>>>>>>>>>>>> a spout/bolt to next bolt. I understand that the spout/bolt >>>>>>>>>>>>> buffers the >>>>>>>>>>>>> data into a queue and then the subsequent bolt consumes from >>>>>>>>>>>>> there. >>>>>>>>>>>>> >>>>>>>>>>>>> *Infrastructure* >>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and >>>>>>>>>>>>> there are 20 workers overall. >>>>>>>>>>>>> >>>>>>>>>>>>> *Test* >>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25 >>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds. >>>>>>>>>>>>> >>>>>>>>>>>>> *Config values* >>>>>>>>>>>>> Config config = new Config(); >>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20)); >>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, >>>>>>>>>>>>> 16384); >>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); >>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1); >>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); >>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64); >>>>>>>>>>>>> >>>>>>>>>>>>> Please let me know if you have encountered similar issues and >>>>>>>>>>>>> any steps you have taken to mitigate the time taken between >>>>>>>>>>>>> spout/bolt and >>>>>>>>>>>>> another bolt. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Kashyap >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Nikolaos Romanos Katsipoulakis, >>>>>> University of Pittsburgh, PhD candidate >>>>>> >>>>> >>>>> >>
