I kind of believe that the MainThread which picks the data from incoming queue is taking a longish time.Did anyone face this?
Execute and Process latencies are under 3-8 ms but the overall time taken for the message to get processed is close to a 300ms. This is where I dont understand what is happening. The case of missing 290ms How is the overall time taken for a message computed? Is it the process latency at the Spout or a sum of process latencies at all the bolts? Thanks Kashyap On Sun, Jul 19, 2015 at 1:30 PM, Kashyap Mhaisekar <[email protected]> wrote: > I changed it to debug to find out the reason behind increased times as I > suspected buffer overflow. It was info level. > > Regards > Kashyap > On Jul 19, 2015 1:18 PM, "Nathan Leung" <[email protected]> wrote: > > Are your logs on debug level? Try changing to info. > On Jul 19, 2015 1:32 PM, "Kashyap Mhaisekar" <[email protected]> wrote: > >> Thanks Nathan. >> The reason for the increased time taken between bolts could be due to - >> 1. Buffer overflow >> 2. GC activity. >> 3. Low parallelism >> 4. Latency between machines (around 0.3 ms) >> >> Debug logs indicate queue capacity and population of queues in limits, so >> probably that is not the cause. >> >> For GC, I see GC/MarkSweepCompact and GC/Copy hovering at 500 ms. Am not >> sure if this number is good or bad. Still figuring out... >> >> Parallelism does not seem to be a problem as capacity is under 0.3-0.5 >> for all bolts. >> >> Do you know of any other reasons based on experience? >> >> Thanks for the time >> >> Thanks >> Kashyap >> On Jul 19, 2015 02:29, "Nathan Leung" <[email protected]> wrote: >> >>> Generally, binary search combined with observation of the system >>> (whether it meets throughput/latency targets) is a good approach. >>> On Jul 17, 2015 6:28 PM, "Kashyap Mhaisekar" <[email protected]> >>> wrote: >>> >>>> Nathan, >>>> The bolts are extending BaseBasicBolt and also, the in the spout am >>>> explicitly emitting a msgId hence tuples should be tagged and anchored. >>>> What I see is this - >>>> 1. The logic exection in the bolt takes not more than 1 ms (start of >>>> execute() and end of execute()) >>>> 2. The time is being spent *between* the bolts >>>> 3. The thread dumps all show LMAX disruptor at - >>>> com.lmax.disruptor.blockingwaitstrategy.*waitfor() *where the maximum >>>> CPU time is being spent. >>>> >>>> Is there a pattern with which the buffer sizes need to be tuned? :( >>>> >>>> Thanks >>>> Kashyap >>>> >>>> On Thu, Jul 16, 2015 at 6:29 PM, Andrew Xor < >>>> [email protected]> wrote: >>>> >>>>> Thanks for the clarification regarding Task ID's Nathan, I was under >>>>> that false impression as the site docs are a bit misleading. Thanks for >>>>> pointing that out! >>>>> >>>>> Regards. >>>>> >>>>> Kindly yours, >>>>> >>>>> Andrew Grammenos >>>>> >>>>> -- PGP PKey -- >>>>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> >>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt >>>>> >>>>> On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <[email protected]> >>>>> wrote: >>>>> >>>>>> If your tuples are reliable (spout emit with message id) and anchored >>>>>> (emit from bolt anchored to input tuple), then you have to answer that >>>>>> yourself. If not, then your output queue size is not constrained by the >>>>>> framework and you may still have high latency. >>>>>> On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Nathan, >>>>>>> My max spout pending is set to 1. Now is my problem with latency or >>>>>>> with throughput. >>>>>>> >>>>>>> Thank you! >>>>>>> Kashyap >>>>>>> On Jul 16, 2015 5:46 PM, "Nathan Leung" <[email protected]> wrote: >>>>>>> >>>>>>>> If your tuples are anchored max spout pending indirectly affects >>>>>>>> how many tuples are generated ;). >>>>>>>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks Nathan. One question though - Are there any best practices >>>>>>>>> when tuples are getting generated in the topology and not really >>>>>>>>> controllable via Max Spout Pending? >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Kashyap >>>>>>>>> >>>>>>>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Also I would argue that this is not important unless your >>>>>>>>>> application is especially latency sensitive or your queue is so long >>>>>>>>>> that >>>>>>>>>> it is causing in flight tuples to timeout. >>>>>>>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Sorry for a brief response.. The number of tuples in flight >>>>>>>>>>> absolutely affects your max latency. You need to tune your >>>>>>>>>>> topology max >>>>>>>>>>> spout pending. Lower value will reduce your end to end latency, >>>>>>>>>>> but if >>>>>>>>>>> it's too low it may affect throughput. I've posted to the group >>>>>>>>>>> about this >>>>>>>>>>> before; if you do a search you may find some posts where I've >>>>>>>>>>> discussed >>>>>>>>>>> this in more detail. >>>>>>>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Nathan, >>>>>>>>>>>> Thanks. Have been running on a bare bones topology as >>>>>>>>>>>> suggested. I am inclined to believe that the no. of messages in the >>>>>>>>>>>> topology at that point in time is affecting the "latency". >>>>>>>>>>>> >>>>>>>>>>>> Am trying to now figure out how should the topology be >>>>>>>>>>>> structured when the no. of transient tupples in the topology is >>>>>>>>>>>> very high. >>>>>>>>>>>> >>>>>>>>>>>> Topology is structured as follows - >>>>>>>>>>>> Consumer (A java program sends a message to storm cluster) -> >>>>>>>>>>>> A (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 >>>>>>>>>>>> messages]) -> >>>>>>>>>>>> C (bolt) [Passes along the message to next bolt) -> D (bolt) >>>>>>>>>>>> [Passes along >>>>>>>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and >>>>>>>>>>>> confirms >>>>>>>>>>>> if all the 100 messages are processed) >>>>>>>>>>>> >>>>>>>>>>>> What I observed is as follows - >>>>>>>>>>>> 1. The time taken for an end to end processing of the message >>>>>>>>>>>> (Sending the message to Storm cluster and till the time the >>>>>>>>>>>> aggregation is >>>>>>>>>>>> complete) is directly dependent on the volume of messages that is >>>>>>>>>>>> entering >>>>>>>>>>>> into storm and also the no. of emits done by the spout A. >>>>>>>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100 >>>>>>>>>>>> tuples per message (100X100=10000) there are 10000 tuples emitted >>>>>>>>>>>> and the >>>>>>>>>>>> time taken to aggregate the 100 is 15 ms to 100 ms* >>>>>>>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000 >>>>>>>>>>>> tuples per message (100X1000=100000) **there are 100000 tuples >>>>>>>>>>>> emitted and the time taken to aggregate the 100 is 4 seconds to 10 >>>>>>>>>>>> seconds* >>>>>>>>>>>> 2.Strange thing is - the more parallelism i add, the times are >>>>>>>>>>>> so much more bad. Am trying to figure out if the memory per worker >>>>>>>>>>>> is a >>>>>>>>>>>> constraint, but this is the firs time am seeing this. >>>>>>>>>>>> >>>>>>>>>>>> Question - How should the use case be handled where in the no. >>>>>>>>>>>> of tuples in the topology could increase exponentially.., >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Kashyap >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thank you all for the valuable info. >>>>>>>>>>>>> >>>>>>>>>>>>> Unfortunately, I have to use it for my (research) prototype >>>>>>>>>>>>> therefore I have to go along with it. >>>>>>>>>>>>> >>>>>>>>>>>>> Thank you again, >>>>>>>>>>>>> Nick >>>>>>>>>>>>> >>>>>>>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <[email protected]>: >>>>>>>>>>>>> >>>>>>>>>>>>>> Storm task ids don't change: >>>>>>>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Direct grouping as it is shown in storm docs, means that you >>>>>>>>>>>>>>> have to have a specific task id and use "direct streams" which >>>>>>>>>>>>>>> is error >>>>>>>>>>>>>>> prone, probably increase latency and might introduce redundancy >>>>>>>>>>>>>>> problems as >>>>>>>>>>>>>>> the producer of tuple needs to know the id of the task the >>>>>>>>>>>>>>> tuple will have >>>>>>>>>>>>>>> to go; so imagine a scenario where the receiving task fails for >>>>>>>>>>>>>>> some reason >>>>>>>>>>>>>>> and the producer can't relay the tuples unless it received the >>>>>>>>>>>>>>> re-spawned >>>>>>>>>>>>>>> task's id. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hope this helps. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Kindly yours, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Andrew Grammenos >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- PGP PKey -- >>>>>>>>>>>>>>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> >>>>>>>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hello again, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Nathan, I am using direct-grouping because the application >>>>>>>>>>>>>>>> I am working on has to be able to send tuples directly to >>>>>>>>>>>>>>>> specific tasks. >>>>>>>>>>>>>>>> In general control the data flow. Can you please explain to me >>>>>>>>>>>>>>>> why you >>>>>>>>>>>>>>>> would not recommend direct grouping? Is there any particular >>>>>>>>>>>>>>>> reason in the >>>>>>>>>>>>>>>> architecture of Storm? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> Nick >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <[email protected]> >>>>>>>>>>>>>>>> : >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I would not recommend direct grouping unless you have a >>>>>>>>>>>>>>>>> good reason for it. Shuffle grouping is essentially random >>>>>>>>>>>>>>>>> with even >>>>>>>>>>>>>>>>> distribution which makes it easier to characterize its >>>>>>>>>>>>>>>>> performance. Local >>>>>>>>>>>>>>>>> or shuffle grouping stays in process so generally it will be >>>>>>>>>>>>>>>>> faster. >>>>>>>>>>>>>>>>> However you have to be careful in certain cases to avoid task >>>>>>>>>>>>>>>>> starvation >>>>>>>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and >>>>>>>>>>>>>>>>> 1 spout task, >>>>>>>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct >>>>>>>>>>>>>>>>> grouping depends >>>>>>>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping >>>>>>>>>>>>>>>>> depends on >>>>>>>>>>>>>>>>> your key distribution, etc. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I have two questions: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the >>>>>>>>>>>>>>>>>> same thing and I have a problem getting the exact >>>>>>>>>>>>>>>>>> milliseconds of latency >>>>>>>>>>>>>>>>>> (mainly because of clock drifting). >>>>>>>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among >>>>>>>>>>>>>>>>>> different groupings? For instance, is shuffle faster than >>>>>>>>>>>>>>>>>> direct grouping? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> Nick >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung < >>>>>>>>>>>>>>>>>> [email protected]>: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Two things. Your math may be off depending on >>>>>>>>>>>>>>>>>>> parallelism. One emit from A becomes 100 emitted from C, >>>>>>>>>>>>>>>>>>> and you are >>>>>>>>>>>>>>>>>>> joining all of them. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Second, try the default number of ackers (one per >>>>>>>>>>>>>>>>>>> worker). All your ack traffic is going to a single task. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible >>>>>>>>>>>>>>>>>>> to reduce network transfers. >>>>>>>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" < >>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>> We are attempting a real-time distributed computing >>>>>>>>>>>>>>>>>>>> using storm and the solution has only one problem - >>>>>>>>>>>>>>>>>>>> inter bolt latency on same machine or across machines >>>>>>>>>>>>>>>>>>>> ranges between 2 - 250 ms. I am not able to figure out >>>>>>>>>>>>>>>>>>>> why. Network >>>>>>>>>>>>>>>>>>>> latency is under 0.5 ms. By latency, I mean the time >>>>>>>>>>>>>>>>>>>> between an emit of one bolt/spout to getting the message >>>>>>>>>>>>>>>>>>>> in execute() of >>>>>>>>>>>>>>>>>>>> next bolt. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I have a topology like the below - >>>>>>>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) >>>>>>>>>>>>>>>>>>>> [Receives this number and divides this into 10 emits of >>>>>>>>>>>>>>>>>>>> 100 each) -> C >>>>>>>>>>>>>>>>>>>> (bolt) [Recieves these emits and divides this to 10 emits >>>>>>>>>>>>>>>>>>>> of 10 numbers) -> >>>>>>>>>>>>>>>>>>>> D (bolt) [Does some computation on the number and emits >>>>>>>>>>>>>>>>>>>> one message] -> E >>>>>>>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the >>>>>>>>>>>>>>>>>>>> 1000 messages are >>>>>>>>>>>>>>>>>>>> processed) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a >>>>>>>>>>>>>>>>>>>> result, I estimated that the end to end processing for >>>>>>>>>>>>>>>>>>>> 1000 takes not more >>>>>>>>>>>>>>>>>>>> than 50 msec including any latencies. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> *Observations* >>>>>>>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 >>>>>>>>>>>>>>>>>>>> msec to 3 seconds. My estimate was under 50 msec given >>>>>>>>>>>>>>>>>>>> that each bolt and >>>>>>>>>>>>>>>>>>>> spout take under 3 msec to execute including any latencies. >>>>>>>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between >>>>>>>>>>>>>>>>>>>> Emit from a Spout/Bolt and execute() of the consuming bolt. >>>>>>>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I am not able to figure out why it takes so much time >>>>>>>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the >>>>>>>>>>>>>>>>>>>> spout/bolt buffers >>>>>>>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt >>>>>>>>>>>>>>>>>>>> consumes from there. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> *Infrastructure* >>>>>>>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 >>>>>>>>>>>>>>>>>>>> MB and there are 20 workers overall. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> *Test* >>>>>>>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => >>>>>>>>>>>>>>>>>>>> 25 messages are sent to spout in a span of 5 seconds. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> *Config values* >>>>>>>>>>>>>>>>>>>> Config config = new Config(); >>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, >>>>>>>>>>>>>>>>>>>> Integer.parseInt(20)); >>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, >>>>>>>>>>>>>>>>>>>> 16384); >>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, >>>>>>>>>>>>>>>>>>>> 16384); >>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1); >>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); >>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64); >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Please let me know if you have encountered similar >>>>>>>>>>>>>>>>>>>> issues and any steps you have taken to mitigate the time >>>>>>>>>>>>>>>>>>>> taken between >>>>>>>>>>>>>>>>>>>> spout/bolt and another bolt. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>> Kashyap >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>> >>>>
