The Spout is suspicous. >From the screenshots, you are using no-acked spout, there may also be GC issue there. Here is the suggestion:
Check whether you are making connection in the context of nextTuple. If that is true, it means a large latency. You can check the spout latency by enabling acked spout(when emmit, add a messageId. e.g. emit(new Values(msg), messageId), and check the latency. If this is the case, you need to create a seperate thread for data connection in spout, and use a queue to buffer messages, then nextTuple just pol the queue. On Wed, Mar 19, 2014 at 8:06 AM, Michael Rose <[email protected]>wrote: > Well, I see you have 30 spouts instances and 3 bolt instances. Doesn't > seem like it's a huge bottleneck, but it is having to send over the wire > much of the time. Something to keep in mind for the future. > > I'd be most suspicious of your spouts. All spouts on a single worker are > run single-threaded (in an event loop calling nextTuple()), so if you have > ANY blocking work that'll kill throughput. If AzureServiceBus is anything > like SQS, response times are not instant. We tend to have background > threads feeding a queue in our spouts for that reason, as we use SQS for > many of our topologies. Given 12 workers, you'd have ~3 spouts per machine > running single-threaded. > > With spouts, you should attempt to maintain as little blocking work as > possible...if you're using a queue, you should be using Queue#poll() to > either return a value OR null, and only emit a tuple if an event is > available (and meets your criteria). Storm will handle rate limiting the > spouts with sleeps. > > Michael Rose (@Xorlev <https://twitter.com/xorlev>) > Senior Platform Engineer, FullContact <http://www.fullcontact.com/> > [email protected] > > > On Tue, Mar 18, 2014 at 5:14 PM, David Crossland <[email protected]>wrote: > >> Perhaps these screenshots might shed some light? I don't think there is >> much of a latency issue. I'm really starting to suspect there is some >> consumption rate issue from the topic. >> >> I set the spout to a high parallelism value as it did seem to improve >> throughput.. >> >> But if there is anything you can spot that would be grand >> >> Thanks >> David >> >> *From:* Nathan Leung <[email protected]> >> *Sent:* Tuesday, 18 March 2014 21:14 >> *To:* [email protected] >> >> It could be bolt 3. What is the latency like between your worker and >> your redis server? Increasing the number of threads for bolt 3 will likely >> increase your throughput. Bolt 1 and 2 are probably CPU bound, but bolt 3 >> is probably restricted by your network access. Also I've found that >> localOrShuffleGrouping can improve performance due to reduced network >> communications. >> >> >> On Tue, Mar 18, 2014 at 3:55 PM, David Crossland >> <[email protected]>wrote: >> >>> A bit more information then >>> >>> There are 4 components >>> >>> Spout - This is reading from an azure service bus topic/subscription. >>> A connection is created in the open() method of the spout, nextTuple does a >>> peek on the message, and invokes the following code; >>> >>> StringWriter writer = new StringWriter(); >>> IOUtils.copy(message.getBody(), writer); >>> String messageBody = writer.toString(); >>> >>> It then deletes the message from the queue. >>> >>> Overall nothing all that exciting.. >>> >>> Bolt 1 - Filtering >>> >>> Parses the message body (json string) and converts it to an object >>> representation. Filters out anything that isn't a monetise message. It >>> then emits the monetise message object to the next bolt. Monetise messages >>> account for ~ 0.03% of the total message volume. >>> >>> Bolt 2 - transformation >>> >>> Basically extracts from the monetise object the values that are >>> interesting and contracts a string which it emits >>> >>> Bolt 3 - Storage >>> >>> Stores the transformed string in Redis using the current date/time as >>> key. >>> >>> ----- >>> >>> Shuffle grouping is used with the topology >>> >>> I ack every tuple irrespective of whether I emit the tuple or not. It >>> should not be attempting to replay tuple. >>> >>> ----- >>> >>> I don't think Bolt 2/3 are the cause of the bottleneck. They don't >>> have to process much data at all tbh. >>> >>> I can accept that perhaps there is something inefficient with the >>> spout, perhaps it just can't read from the service bus quickly enough. I >>> will do some more research on this and have a chat with the colleague who >>> wrote this component. >>> >>> I suppose I'm just trying to identify if I've configured something >>> incorrectly with respect to storm, whether I'm correct to relate the total >>> number of executors and tasks to the total number of cores I have >>> available. I find it strange that I get a better throughput when I choose >>> an arbitrary large number for the parallelism hint than if I constrain >>> myself to a maximum that equates to the number of cores. >>> >>> D >>> >>> *From:* Nathan Leung <[email protected]> >>> *Sent:* Tuesday, 18 March 2014 18:38 >>> *To:* [email protected] >>> >>> In my experience storm is able to make good use of CPU resources, if >>> the application is written appropriately. You shouldn't require too much >>> executor parallelism if your application is CPU intensive. If your bolts >>> are doing things like remote DB/NoSQL accesses, then that changes things >>> and parallelizing bolts will give you more throughput. Not knowing your >>> application, the best way to pin down the problem is to simplify your >>> topology. Cut out everything except for the Spout. How is your filtering >>> done? if you return without emitting, the latest versions of storm will >>> sleep before trying again. It may be worthwhile to loop in the spout until >>> you receive a valid message, or the bus is empty. How much throughput can >>> you achieve from the spout, emitting a tuple into the ether? Maybe the >>> problem is your message bus. Once you have achieve a level of performance >>> you are satisfied from the spout, add one bolt. What bottlenecks does the >>> bolt introduce? etc etc. >>> >>> >>> On Tue, Mar 18, 2014 at 2:31 PM, David Crossland >>> <[email protected]>wrote: >>> >>>> Could my issue relate to memory allocated to the JVM? Most of the >>>> setting are pretty much the defaults. Are there any other settings that >>>> could be throttling the topology? >>>> >>>> I'd like to be able to identify the issue without all this >>>> constant “stabbing in the dark”… 😃 >>>> >>>> D >>>> >>>> *From:* David Crossland <[email protected]> >>>> *Sent:* Tuesday, 18 March 2014 16:32 >>>> *To:* [email protected] >>>> >>>> Being very new to storm I'm not sure what to expect in some >>>> regards. >>>> >>>> Ive been playing about with the number of workers/executors/tasks >>>> trying to improve throughput on my cluster. I have a 3 nodes, two 4 core >>>> and a 2 core node (I can't increase the 3rd node to a medium until the >>>> customer gets more cores..). There is a spout that reads from a message >>>> bus and a bolt that filter out all but the messages we are interested in >>>> processing downstream in the topology. Most messages are filtered out. >>>> >>>> I'm assuming that these two components require the most resources as >>>> they should be reading/filtering messages at a constant rate, there are two >>>> further bolts that are invoked intermittently and hence require less. >>>> >>>> Ive set the number of workers to 12 (in fact I've noticed it rarely >>>> seems to make much difference if I set this to 3/6/9 or 12, there is >>>> marginal improvement the higher the value). >>>> >>>> The spout and filtering bolt I've tried a number of values for in the >>>> parallelism hint (I started with 1/4/8/16/20/30/40/60/128/256… ) and I can >>>> barely get the throughput to exceed 3500 messages per minute. And the >>>> larger hints just grind the system to a halt. Currently I have them both >>>> set to 20. >>>> >>>> The strange thing is that no matter what I do the CPU load is very >>>> tiny and is typically 80-90% idle. Suggesting that the topology isn't >>>> doing that much work. And tbh I've no idea why this is. Can anyone offer >>>> any suggestions? >>>> >>>> If I understand how this should work, given the number of cores I >>>> would think the total number of executors should total 10? Spawning 1 >>>> thread per node, I can then set a number of tasks to say 2/4/8 per thread >>>> (I've no idea which would be most efficient..). Ive tried something along >>>> these lines and my throughput was significantly less, approx. 2000 messages >>>> per minute. In fact parallelism hint of 20/24/30 seem to have produced the >>>> most throughput. >>>> >>>> All help/suggestions gratefully received! >>>> >>>> Thanks >>>> David >>>> >>>> >>> >> >
