If your tuples are anchored max spout pending indirectly affects how many tuples are generated ;). On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <[email protected]> wrote:
> Thanks Nathan. One question though - Are there any best practices when > tuples are getting generated in the topology and not really controllable > via Max Spout Pending? > > Thanks > Kashyap > > On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <[email protected]> wrote: > >> Also I would argue that this is not important unless your application is >> especially latency sensitive or your queue is so long that it is causing in >> flight tuples to timeout. >> On Jul 16, 2015 6:05 PM, "Nathan Leung" <[email protected]> wrote: >> >>> Sorry for a brief response.. The number of tuples in flight absolutely >>> affects your max latency. You need to tune your topology max spout >>> pending. Lower value will reduce your end to end latency, but if it's too >>> low it may affect throughput. I've posted to the group about this before; >>> if you do a search you may find some posts where I've discussed this in >>> more detail. >>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <[email protected]> >>> wrote: >>> >>>> Nathan, >>>> Thanks. Have been running on a bare bones topology as suggested. I am >>>> inclined to believe that the no. of messages in the topology at that point >>>> in time is affecting the "latency". >>>> >>>> Am trying to now figure out how should the topology be structured when >>>> the no. of transient tupples in the topology is very high. >>>> >>>> Topology is structured as follows - >>>> Consumer (A java program sends a message to storm cluster) -> A >>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C >>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes along >>>> the message to next bolt] -> E (bolt) [Aggregates all the data and confirms >>>> if all the 100 messages are processed) >>>> >>>> What I observed is as follows - >>>> 1. The time taken for an end to end processing of the message (Sending >>>> the message to Storm cluster and till the time the aggregation is complete) >>>> is directly dependent on the volume of messages that is entering into storm >>>> and also the no. of emits done by the spout A. >>>> *Test 1: 100 sequential messages to storm with B emitting 100 tuples >>>> per message (100X100=10000) there are 10000 tuples emitted and the time >>>> taken to aggregate the 100 is 15 ms to 100 ms* >>>> *Test 2: 100 sequential messages to storm with B emitting 1000 tuples >>>> per message (100X1000=100000) **there are 100000 tuples emitted and >>>> the time taken to aggregate the 100 is 4 seconds to 10 seconds* >>>> 2.Strange thing is - the more parallelism i add, the times are so much >>>> more bad. Am trying to figure out if the memory per worker is a constraint, >>>> but this is the firs time am seeing this. >>>> >>>> Question - How should the use case be handled where in the no. of >>>> tuples in the topology could increase exponentially.., >>>> >>>> Thanks >>>> Kashyap >>>> >>>> >>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis < >>>> [email protected]> wrote: >>>> >>>>> Thank you all for the valuable info. >>>>> >>>>> Unfortunately, I have to use it for my (research) prototype therefore >>>>> I have to go along with it. >>>>> >>>>> Thank you again, >>>>> Nick >>>>> >>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <[email protected]>: >>>>> >>>>>> Storm task ids don't change: >>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c >>>>>> >>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Direct grouping as it is shown in storm docs, means that you have to >>>>>>> have a specific task id and use "direct streams" which is error prone, >>>>>>> probably increase latency and might introduce redundancy problems as the >>>>>>> producer of tuple needs to know the id of the task the tuple will have >>>>>>> to >>>>>>> go; so imagine a scenario where the receiving task fails for some reason >>>>>>> and the producer can't relay the tuples unless it received the >>>>>>> re-spawned >>>>>>> task's id. >>>>>>> >>>>>>> Hope this helps. >>>>>>> >>>>>>> Kindly yours, >>>>>>> >>>>>>> Andrew Grammenos >>>>>>> >>>>>>> -- PGP PKey -- >>>>>>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> >>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt >>>>>>> >>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hello again, >>>>>>>> >>>>>>>> Nathan, I am using direct-grouping because the application I am >>>>>>>> working on has to be able to send tuples directly to specific tasks. In >>>>>>>> general control the data flow. Can you please explain to me why you >>>>>>>> would >>>>>>>> not recommend direct grouping? Is there any particular reason in the >>>>>>>> architecture of Storm? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Nick >>>>>>>> >>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <[email protected]>: >>>>>>>> >>>>>>>>> I would not recommend direct grouping unless you have a good >>>>>>>>> reason for it. Shuffle grouping is essentially random with even >>>>>>>>> distribution which makes it easier to characterize its performance. >>>>>>>>> Local >>>>>>>>> or shuffle grouping stays in process so generally it will be faster. >>>>>>>>> However you have to be careful in certain cases to avoid task >>>>>>>>> starvation >>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 spout >>>>>>>>> task, >>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct grouping >>>>>>>>> depends >>>>>>>>> on your code (i.e. you can create hotspots), fields grouping depends >>>>>>>>> on >>>>>>>>> your key distribution, etc. >>>>>>>>> >>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hello all, >>>>>>>>>> >>>>>>>>>> I have two questions: >>>>>>>>>> >>>>>>>>>> 1) How do you exactly measure latency? I am doing the same thing >>>>>>>>>> and I have a problem getting the exact milliseconds of latency >>>>>>>>>> (mainly >>>>>>>>>> because of clock drifting). >>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among different >>>>>>>>>> groupings? For instance, is shuffle faster than direct grouping? >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Nick >>>>>>>>>> >>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <[email protected]>: >>>>>>>>>> >>>>>>>>>>> Two things. Your math may be off depending on parallelism. One >>>>>>>>>>> emit from A becomes 100 emitted from C, and you are joining all of >>>>>>>>>>> them. >>>>>>>>>>> >>>>>>>>>>> Second, try the default number of ackers (one per worker). All >>>>>>>>>>> your ack traffic is going to a single task. >>>>>>>>>>> >>>>>>>>>>> Also you can try local or shuffle grouping if possible to reduce >>>>>>>>>>> network transfers. >>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> We are attempting a real-time distributed computing using storm and >>>>>>>>>>>> the solution has only one problem - inter bolt latency on same >>>>>>>>>>>> machine or across machines ranges between 2 - 250 ms. I am not >>>>>>>>>>>> able to >>>>>>>>>>>> figure out why. Network latency is under 0.5 ms. By latency, I >>>>>>>>>>>> mean the time between an emit of one bolt/spout to getting the >>>>>>>>>>>> message in >>>>>>>>>>>> execute() of next bolt. >>>>>>>>>>>> >>>>>>>>>>>> I have a topology like the below - >>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives >>>>>>>>>>>> this number and divides this into 10 emits of 100 each) -> C (bolt) >>>>>>>>>>>> [Recieves these emits and divides this to 10 emits of 10 numbers) >>>>>>>>>>>> -> D >>>>>>>>>>>> (bolt) [Does some computation on the number and emits one message] >>>>>>>>>>>> -> E >>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 >>>>>>>>>>>> messages are >>>>>>>>>>>> processed) >>>>>>>>>>>> >>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result, I >>>>>>>>>>>> estimated that the end to end processing for 1000 takes not more >>>>>>>>>>>> than 50 >>>>>>>>>>>> msec including any latencies. >>>>>>>>>>>> >>>>>>>>>>>> *Observations* >>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec to >>>>>>>>>>>> 3 seconds. My estimate was under 50 msec given that each bolt and >>>>>>>>>>>> spout >>>>>>>>>>>> take under 3 msec to execute including any latencies. >>>>>>>>>>>> 2. I noticed that the most of the time is spent between Emit >>>>>>>>>>>> from a Spout/Bolt and execute() of the consuming bolt. >>>>>>>>>>>> 3. Network latency is under 0.5 msec. >>>>>>>>>>>> >>>>>>>>>>>> I am not able to figure out why it takes so much time between a >>>>>>>>>>>> spout/bolt to next bolt. I understand that the spout/bolt buffers >>>>>>>>>>>> the data >>>>>>>>>>>> into a queue and then the subsequent bolt consumes from there. >>>>>>>>>>>> >>>>>>>>>>>> *Infrastructure* >>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and >>>>>>>>>>>> there are 20 workers overall. >>>>>>>>>>>> >>>>>>>>>>>> *Test* >>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25 >>>>>>>>>>>> messages are sent to spout in a span of 5 seconds. >>>>>>>>>>>> >>>>>>>>>>>> *Config values* >>>>>>>>>>>> Config config = new Config(); >>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20)); >>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); >>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); >>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1); >>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); >>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64); >>>>>>>>>>>> >>>>>>>>>>>> Please let me know if you have encountered similar issues and >>>>>>>>>>>> any steps you have taken to mitigate the time taken between >>>>>>>>>>>> spout/bolt and >>>>>>>>>>>> another bolt. >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Kashyap >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Nikolaos Romanos Katsipoulakis, >>>>> University of Pittsburgh, PhD candidate >>>>> >>>> >>>> >
