Thanks Nathan. One question though - Are there any best practices when tuples are getting generated in the topology and not really controllable via Max Spout Pending?
Thanks Kashyap On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <[email protected]> wrote: > Also I would argue that this is not important unless your application is > especially latency sensitive or your queue is so long that it is causing in > flight tuples to timeout. > On Jul 16, 2015 6:05 PM, "Nathan Leung" <[email protected]> wrote: > >> Sorry for a brief response.. The number of tuples in flight absolutely >> affects your max latency. You need to tune your topology max spout >> pending. Lower value will reduce your end to end latency, but if it's too >> low it may affect throughput. I've posted to the group about this before; >> if you do a search you may find some posts where I've discussed this in >> more detail. >> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <[email protected]> wrote: >> >>> Nathan, >>> Thanks. Have been running on a bare bones topology as suggested. I am >>> inclined to believe that the no. of messages in the topology at that point >>> in time is affecting the "latency". >>> >>> Am trying to now figure out how should the topology be structured when >>> the no. of transient tupples in the topology is very high. >>> >>> Topology is structured as follows - >>> Consumer (A java program sends a message to storm cluster) -> A (Spout) >>> ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C (bolt) >>> [Passes along the message to next bolt) -> D (bolt) [Passes along the >>> message to next bolt] -> E (bolt) [Aggregates all the data and confirms if >>> all the 100 messages are processed) >>> >>> What I observed is as follows - >>> 1. The time taken for an end to end processing of the message (Sending >>> the message to Storm cluster and till the time the aggregation is complete) >>> is directly dependent on the volume of messages that is entering into storm >>> and also the no. of emits done by the spout A. >>> *Test 1: 100 sequential messages to storm with B emitting 100 tuples per >>> message (100X100=10000) there are 10000 tuples emitted and the time taken >>> to aggregate the 100 is 15 ms to 100 ms* >>> *Test 2: 100 sequential messages to storm with B emitting 1000 tuples >>> per message (100X1000=100000) **there are 100000 tuples emitted and the >>> time taken to aggregate the 100 is 4 seconds to 10 seconds* >>> 2.Strange thing is - the more parallelism i add, the times are so much >>> more bad. Am trying to figure out if the memory per worker is a constraint, >>> but this is the firs time am seeing this. >>> >>> Question - How should the use case be handled where in the no. of tuples >>> in the topology could increase exponentially.., >>> >>> Thanks >>> Kashyap >>> >>> >>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis < >>> [email protected]> wrote: >>> >>>> Thank you all for the valuable info. >>>> >>>> Unfortunately, I have to use it for my (research) prototype therefore I >>>> have to go along with it. >>>> >>>> Thank you again, >>>> Nick >>>> >>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <[email protected]>: >>>> >>>>> Storm task ids don't change: >>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c >>>>> >>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor < >>>>> [email protected]> wrote: >>>>> >>>>>> Direct grouping as it is shown in storm docs, means that you have to >>>>>> have a specific task id and use "direct streams" which is error prone, >>>>>> probably increase latency and might introduce redundancy problems as the >>>>>> producer of tuple needs to know the id of the task the tuple will have to >>>>>> go; so imagine a scenario where the receiving task fails for some reason >>>>>> and the producer can't relay the tuples unless it received the re-spawned >>>>>> task's id. >>>>>> >>>>>> Hope this helps. >>>>>> >>>>>> Kindly yours, >>>>>> >>>>>> Andrew Grammenos >>>>>> >>>>>> -- PGP PKey -- >>>>>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> >>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt >>>>>> >>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hello again, >>>>>>> >>>>>>> Nathan, I am using direct-grouping because the application I am >>>>>>> working on has to be able to send tuples directly to specific tasks. In >>>>>>> general control the data flow. Can you please explain to me why you >>>>>>> would >>>>>>> not recommend direct grouping? Is there any particular reason in the >>>>>>> architecture of Storm? >>>>>>> >>>>>>> Thanks, >>>>>>> Nick >>>>>>> >>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <[email protected]>: >>>>>>> >>>>>>>> I would not recommend direct grouping unless you have a good reason >>>>>>>> for it. Shuffle grouping is essentially random with even distribution >>>>>>>> which makes it easier to characterize its performance. Local or >>>>>>>> shuffle >>>>>>>> grouping stays in process so generally it will be faster. However you >>>>>>>> have >>>>>>>> to be careful in certain cases to avoid task starvation (e.g. you have >>>>>>>> kafka spout with 1 partition on the topic and 1 spout task, feeding 10 >>>>>>>> bolt >>>>>>>> "A" tasks in 10 worker processes). Direct grouping depends on your code >>>>>>>> (i.e. you can create hotspots), fields grouping depends on your key >>>>>>>> distribution, etc. >>>>>>>> >>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hello all, >>>>>>>>> >>>>>>>>> I have two questions: >>>>>>>>> >>>>>>>>> 1) How do you exactly measure latency? I am doing the same thing >>>>>>>>> and I have a problem getting the exact milliseconds of latency (mainly >>>>>>>>> because of clock drifting). >>>>>>>>> 2) (to Nathan) Is there a difference in speeds among different >>>>>>>>> groupings? For instance, is shuffle faster than direct grouping? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Nick >>>>>>>>> >>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <[email protected]>: >>>>>>>>> >>>>>>>>>> Two things. Your math may be off depending on parallelism. One >>>>>>>>>> emit from A becomes 100 emitted from C, and you are joining all of >>>>>>>>>> them. >>>>>>>>>> >>>>>>>>>> Second, try the default number of ackers (one per worker). All >>>>>>>>>> your ack traffic is going to a single task. >>>>>>>>>> >>>>>>>>>> Also you can try local or shuffle grouping if possible to reduce >>>>>>>>>> network transfers. >>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> We are attempting a real-time distributed computing using storm and >>>>>>>>>>> the solution has only one problem - inter bolt latency on same >>>>>>>>>>> machine or across machines ranges between 2 - 250 ms. I am not able >>>>>>>>>>> to >>>>>>>>>>> figure out why. Network latency is under 0.5 ms. By latency, I >>>>>>>>>>> mean the time between an emit of one bolt/spout to getting the >>>>>>>>>>> message in >>>>>>>>>>> execute() of next bolt. >>>>>>>>>>> >>>>>>>>>>> I have a topology like the below - >>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives this >>>>>>>>>>> number and divides this into 10 emits of 100 each) -> C (bolt) >>>>>>>>>>> [Recieves >>>>>>>>>>> these emits and divides this to 10 emits of 10 numbers) -> D (bolt) >>>>>>>>>>> [Does >>>>>>>>>>> some computation on the number and emits one message] -> E (bolt) >>>>>>>>>>> [Aggregates all the data and confirms if all the 1000 messages are >>>>>>>>>>> processed) >>>>>>>>>>> >>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result, I >>>>>>>>>>> estimated that the end to end processing for 1000 takes not more >>>>>>>>>>> than 50 >>>>>>>>>>> msec including any latencies. >>>>>>>>>>> >>>>>>>>>>> *Observations* >>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec to >>>>>>>>>>> 3 seconds. My estimate was under 50 msec given that each bolt and >>>>>>>>>>> spout >>>>>>>>>>> take under 3 msec to execute including any latencies. >>>>>>>>>>> 2. I noticed that the most of the time is spent between Emit >>>>>>>>>>> from a Spout/Bolt and execute() of the consuming bolt. >>>>>>>>>>> 3. Network latency is under 0.5 msec. >>>>>>>>>>> >>>>>>>>>>> I am not able to figure out why it takes so much time between a >>>>>>>>>>> spout/bolt to next bolt. I understand that the spout/bolt buffers >>>>>>>>>>> the data >>>>>>>>>>> into a queue and then the subsequent bolt consumes from there. >>>>>>>>>>> >>>>>>>>>>> *Infrastructure* >>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and >>>>>>>>>>> there are 20 workers overall. >>>>>>>>>>> >>>>>>>>>>> *Test* >>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25 >>>>>>>>>>> messages are sent to spout in a span of 5 seconds. >>>>>>>>>>> >>>>>>>>>>> *Config values* >>>>>>>>>>> Config config = new Config(); >>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20)); >>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); >>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); >>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1); >>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); >>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64); >>>>>>>>>>> >>>>>>>>>>> Please let me know if you have encountered similar issues and >>>>>>>>>>> any steps you have taken to mitigate the time taken between >>>>>>>>>>> spout/bolt and >>>>>>>>>>> another bolt. >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Kashyap >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>> University of Pittsburgh, PhD candidate >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Nikolaos Romanos Katsipoulakis, >>>> University of Pittsburgh, PhD candidate >>>> >>> >>>
