I changed it to debug to find out the reason behind increased times as I suspected buffer overflow. It was info level.
Regards Kashyap On Jul 19, 2015 1:18 PM, "Nathan Leung" <[email protected]> wrote: Are your logs on debug level? Try changing to info. On Jul 19, 2015 1:32 PM, "Kashyap Mhaisekar" <[email protected]> wrote: > Thanks Nathan. > The reason for the increased time taken between bolts could be due to - > 1. Buffer overflow > 2. GC activity. > 3. Low parallelism > 4. Latency between machines (around 0.3 ms) > > Debug logs indicate queue capacity and population of queues in limits, so > probably that is not the cause. > > For GC, I see GC/MarkSweepCompact and GC/Copy hovering at 500 ms. Am not > sure if this number is good or bad. Still figuring out... > > Parallelism does not seem to be a problem as capacity is under 0.3-0.5 for > all bolts. > > Do you know of any other reasons based on experience? > > Thanks for the time > > Thanks > Kashyap > On Jul 19, 2015 02:29, "Nathan Leung" <[email protected]> wrote: > >> Generally, binary search combined with observation of the system (whether >> it meets throughput/latency targets) is a good approach. >> On Jul 17, 2015 6:28 PM, "Kashyap Mhaisekar" <[email protected]> wrote: >> >>> Nathan, >>> The bolts are extending BaseBasicBolt and also, the in the spout am >>> explicitly emitting a msgId hence tuples should be tagged and anchored. >>> What I see is this - >>> 1. The logic exection in the bolt takes not more than 1 ms (start of >>> execute() and end of execute()) >>> 2. The time is being spent *between* the bolts >>> 3. The thread dumps all show LMAX disruptor at - >>> com.lmax.disruptor.blockingwaitstrategy.*waitfor() *where the maximum >>> CPU time is being spent. >>> >>> Is there a pattern with which the buffer sizes need to be tuned? :( >>> >>> Thanks >>> Kashyap >>> >>> On Thu, Jul 16, 2015 at 6:29 PM, Andrew Xor <[email protected] >>> > wrote: >>> >>>> Thanks for the clarification regarding Task ID's Nathan, I was under >>>> that false impression as the site docs are a bit misleading. Thanks for >>>> pointing that out! >>>> >>>> Regards. >>>> >>>> Kindly yours, >>>> >>>> Andrew Grammenos >>>> >>>> -- PGP PKey -- >>>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> >>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt >>>> >>>> On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <[email protected]> >>>> wrote: >>>> >>>>> If your tuples are reliable (spout emit with message id) and anchored >>>>> (emit from bolt anchored to input tuple), then you have to answer that >>>>> yourself. If not, then your output queue size is not constrained by the >>>>> framework and you may still have high latency. >>>>> On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <[email protected]> >>>>> wrote: >>>>> >>>>>> Nathan, >>>>>> My max spout pending is set to 1. Now is my problem with latency or >>>>>> with throughput. >>>>>> >>>>>> Thank you! >>>>>> Kashyap >>>>>> On Jul 16, 2015 5:46 PM, "Nathan Leung" <[email protected]> wrote: >>>>>> >>>>>>> If your tuples are anchored max spout pending indirectly affects how >>>>>>> many tuples are generated ;). >>>>>>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks Nathan. One question though - Are there any best practices >>>>>>>> when tuples are getting generated in the topology and not really >>>>>>>> controllable via Max Spout Pending? >>>>>>>> >>>>>>>> Thanks >>>>>>>> Kashyap >>>>>>>> >>>>>>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Also I would argue that this is not important unless your >>>>>>>>> application is especially latency sensitive or your queue is so long >>>>>>>>> that >>>>>>>>> it is causing in flight tuples to timeout. >>>>>>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <[email protected]> wrote: >>>>>>>>> >>>>>>>>>> Sorry for a brief response.. The number of tuples in flight >>>>>>>>>> absolutely affects your max latency. You need to tune your topology >>>>>>>>>> max >>>>>>>>>> spout pending. Lower value will reduce your end to end latency, but >>>>>>>>>> if >>>>>>>>>> it's too low it may affect throughput. I've posted to the group >>>>>>>>>> about this >>>>>>>>>> before; if you do a search you may find some posts where I've >>>>>>>>>> discussed >>>>>>>>>> this in more detail. >>>>>>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Nathan, >>>>>>>>>>> Thanks. Have been running on a bare bones topology as suggested. >>>>>>>>>>> I am inclined to believe that the no. of messages in the topology >>>>>>>>>>> at that >>>>>>>>>>> point in time is affecting the "latency". >>>>>>>>>>> >>>>>>>>>>> Am trying to now figure out how should the topology be >>>>>>>>>>> structured when the no. of transient tupples in the topology is >>>>>>>>>>> very high. >>>>>>>>>>> >>>>>>>>>>> Topology is structured as follows - >>>>>>>>>>> Consumer (A java program sends a message to storm cluster) -> A >>>>>>>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 >>>>>>>>>>> messages]) -> C >>>>>>>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes >>>>>>>>>>> along >>>>>>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and >>>>>>>>>>> confirms >>>>>>>>>>> if all the 100 messages are processed) >>>>>>>>>>> >>>>>>>>>>> What I observed is as follows - >>>>>>>>>>> 1. The time taken for an end to end processing of the message >>>>>>>>>>> (Sending the message to Storm cluster and till the time the >>>>>>>>>>> aggregation is >>>>>>>>>>> complete) is directly dependent on the volume of messages that is >>>>>>>>>>> entering >>>>>>>>>>> into storm and also the no. of emits done by the spout A. >>>>>>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100 >>>>>>>>>>> tuples per message (100X100=10000) there are 10000 tuples emitted >>>>>>>>>>> and the >>>>>>>>>>> time taken to aggregate the 100 is 15 ms to 100 ms* >>>>>>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000 >>>>>>>>>>> tuples per message (100X1000=100000) **there are 100000 tuples >>>>>>>>>>> emitted and the time taken to aggregate the 100 is 4 seconds to 10 >>>>>>>>>>> seconds* >>>>>>>>>>> 2.Strange thing is - the more parallelism i add, the times are >>>>>>>>>>> so much more bad. Am trying to figure out if the memory per worker >>>>>>>>>>> is a >>>>>>>>>>> constraint, but this is the firs time am seeing this. >>>>>>>>>>> >>>>>>>>>>> Question - How should the use case be handled where in the no. >>>>>>>>>>> of tuples in the topology could increase exponentially.., >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Kashyap >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thank you all for the valuable info. >>>>>>>>>>>> >>>>>>>>>>>> Unfortunately, I have to use it for my (research) prototype >>>>>>>>>>>> therefore I have to go along with it. >>>>>>>>>>>> >>>>>>>>>>>> Thank you again, >>>>>>>>>>>> Nick >>>>>>>>>>>> >>>>>>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <[email protected]>: >>>>>>>>>>>> >>>>>>>>>>>>> Storm task ids don't change: >>>>>>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Direct grouping as it is shown in storm docs, means that you >>>>>>>>>>>>>> have to have a specific task id and use "direct streams" which >>>>>>>>>>>>>> is error >>>>>>>>>>>>>> prone, probably increase latency and might introduce redundancy >>>>>>>>>>>>>> problems as >>>>>>>>>>>>>> the producer of tuple needs to know the id of the task the tuple >>>>>>>>>>>>>> will have >>>>>>>>>>>>>> to go; so imagine a scenario where the receiving task fails for >>>>>>>>>>>>>> some reason >>>>>>>>>>>>>> and the producer can't relay the tuples unless it received the >>>>>>>>>>>>>> re-spawned >>>>>>>>>>>>>> task's id. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hope this helps. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Kindly yours, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Andrew Grammenos >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- PGP PKey -- >>>>>>>>>>>>>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> >>>>>>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hello again, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Nathan, I am using direct-grouping because the application I >>>>>>>>>>>>>>> am working on has to be able to send tuples directly to >>>>>>>>>>>>>>> specific tasks. In >>>>>>>>>>>>>>> general control the data flow. Can you please explain to me why >>>>>>>>>>>>>>> you would >>>>>>>>>>>>>>> not recommend direct grouping? Is there any particular reason >>>>>>>>>>>>>>> in the >>>>>>>>>>>>>>> architecture of Storm? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Nick >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <[email protected]>: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I would not recommend direct grouping unless you have a >>>>>>>>>>>>>>>> good reason for it. Shuffle grouping is essentially random >>>>>>>>>>>>>>>> with even >>>>>>>>>>>>>>>> distribution which makes it easier to characterize its >>>>>>>>>>>>>>>> performance. Local >>>>>>>>>>>>>>>> or shuffle grouping stays in process so generally it will be >>>>>>>>>>>>>>>> faster. >>>>>>>>>>>>>>>> However you have to be careful in certain cases to avoid task >>>>>>>>>>>>>>>> starvation >>>>>>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 >>>>>>>>>>>>>>>> spout task, >>>>>>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct >>>>>>>>>>>>>>>> grouping depends >>>>>>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping >>>>>>>>>>>>>>>> depends on >>>>>>>>>>>>>>>> your key distribution, etc. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I have two questions: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same >>>>>>>>>>>>>>>>> thing and I have a problem getting the exact milliseconds of >>>>>>>>>>>>>>>>> latency >>>>>>>>>>>>>>>>> (mainly because of clock drifting). >>>>>>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among >>>>>>>>>>>>>>>>> different groupings? For instance, is shuffle faster than >>>>>>>>>>>>>>>>> direct grouping? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Nick >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <[email protected] >>>>>>>>>>>>>>>>> >: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Two things. Your math may be off depending on >>>>>>>>>>>>>>>>>> parallelism. One emit from A becomes 100 emitted from C, and >>>>>>>>>>>>>>>>>> you are >>>>>>>>>>>>>>>>>> joining all of them. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Second, try the default number of ackers (one per >>>>>>>>>>>>>>>>>> worker). All your ack traffic is going to a single task. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to >>>>>>>>>>>>>>>>>> reduce network transfers. >>>>>>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>> We are attempting a real-time distributed computing >>>>>>>>>>>>>>>>>>> using storm and the solution has only one problem - >>>>>>>>>>>>>>>>>>> inter bolt latency on same machine or across machines >>>>>>>>>>>>>>>>>>> ranges between 2 - 250 ms. I am not able to figure out why. >>>>>>>>>>>>>>>>>>> Network >>>>>>>>>>>>>>>>>>> latency is under 0.5 ms. By latency, I mean the time >>>>>>>>>>>>>>>>>>> between an emit of one bolt/spout to getting the message in >>>>>>>>>>>>>>>>>>> execute() of >>>>>>>>>>>>>>>>>>> next bolt. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I have a topology like the below - >>>>>>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) >>>>>>>>>>>>>>>>>>> [Receives this number and divides this into 10 emits of 100 >>>>>>>>>>>>>>>>>>> each) -> C >>>>>>>>>>>>>>>>>>> (bolt) [Recieves these emits and divides this to 10 emits >>>>>>>>>>>>>>>>>>> of 10 numbers) -> >>>>>>>>>>>>>>>>>>> D (bolt) [Does some computation on the number and emits one >>>>>>>>>>>>>>>>>>> message] -> E >>>>>>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the >>>>>>>>>>>>>>>>>>> 1000 messages are >>>>>>>>>>>>>>>>>>> processed) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a >>>>>>>>>>>>>>>>>>> result, I estimated that the end to end processing for 1000 >>>>>>>>>>>>>>>>>>> takes not more >>>>>>>>>>>>>>>>>>> than 50 msec including any latencies. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> *Observations* >>>>>>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 >>>>>>>>>>>>>>>>>>> msec to 3 seconds. My estimate was under 50 msec given that >>>>>>>>>>>>>>>>>>> each bolt and >>>>>>>>>>>>>>>>>>> spout take under 3 msec to execute including any latencies. >>>>>>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between >>>>>>>>>>>>>>>>>>> Emit from a Spout/Bolt and execute() of the consuming bolt. >>>>>>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I am not able to figure out why it takes so much time >>>>>>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the >>>>>>>>>>>>>>>>>>> spout/bolt buffers >>>>>>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt consumes >>>>>>>>>>>>>>>>>>> from there. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> *Infrastructure* >>>>>>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 >>>>>>>>>>>>>>>>>>> MB and there are 20 workers overall. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> *Test* >>>>>>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25 >>>>>>>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> *Config values* >>>>>>>>>>>>>>>>>>> Config config = new Config(); >>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, >>>>>>>>>>>>>>>>>>> Integer.parseInt(20)); >>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, >>>>>>>>>>>>>>>>>>> 16384); >>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, >>>>>>>>>>>>>>>>>>> 16384); >>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1); >>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); >>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64); >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Please let me know if you have encountered similar >>>>>>>>>>>>>>>>>>> issues and any steps you have taken to mitigate the time >>>>>>>>>>>>>>>>>>> taken between >>>>>>>>>>>>>>>>>>> spout/bolt and another bolt. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>> Kashyap >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>> >>>
