Thanks for this, I've taken a look at this over the past couple of days. If I follow it correctly (and I'm not certain I do..) I should get 20 messages per second per connection. This tallies with what I see when I have a single executor/task.
As the service bus supports up to 100 connections experimented by raising the number of executors to 100, which does increase the throughput to approx. 60 messages per second, which is not ideal. And I imagine with so many thread that the amount of contention is reducing the efficiency of the system.. Ive tried in previous experiments to limit the number of executors to the number of cores available, but I still can't seem to raise the throughput to a meaningful level.. As I say I will be having a chat with a colleague who has the experience with the service bus to try and gain more insight into what can be done, if anything.. D From: Milinda Pathirage<mailto:[email protected]> Sent: Thursday, 20 March 2014 03:31 To: [email protected]<mailto:[email protected]> I think its worth exploring throughput you can achieve using Azure Service Bus Queue first. Most probably this could be a problem in the way you access Azure Queues. This [1] may help. Thanks Milinda [1] http://msdn.microsoft.com/en-us/library/windowsazure/hh528527.aspx On Wed, Mar 19, 2014 at 9:55 PM, Michael Rose <[email protected]> wrote: > messageId is any unique identifier of the message, such that when ack is > called on your spout you're returned the identifier to then mark the work as > complete in the source in the case it supports replay. > > Michael Rose (@Xorlev) > Senior Platform Engineer, FullContact > [email protected] > > > > On Wed, Mar 19, 2014 at 10:18 AM, David Crossland <[email protected]> > wrote: >> >> Where is this messageId derived from? >> >> I note there is a tuple.getMessageId() but there does not appear to be an >> overload to emit that accepts this? >> >> There is an overload >> >> emit(java.lang.String streamId, java.util.List<java.lang.Object> tuple) >> >> I don't think this is what you are referring to however. >> >> My bolt execute method looks like this; (sorry if I'm being obtuse, this >> is still very new.. ) >> >> public void execute(Tuple tuple) { >> String msg = (String)tuple.getValue(0); >> if(msg == null) >> { >> logger.log(Level.ERROR, "Message is null"); >> //acknowledge the tuple has failed and return >> _collector.fail(tuple); >> return; >> } >> >> MessageProcessor processor = new MessageProcessor(); >> EventMessage message = processor.processMessage(msg); >> >> if(message == null) >> { >> logger.log(Level.DEBUG, "Message did not conform to a known >> event"); >> logger.log(Level.DEBUG, msg); >> //acknowlege the tuple, but dont do anything with it, we dont >> have to emit >> _collector.ack(tuple); >> } >> >> if(message instanceof MonetiseEvent) >> { >> logger.log(Level.DEBUG, "recieved monetise message"); >> _collector.emit(new Values(message)); >> _collector.ack(tuple); >> } >> } >> >> >> D >> >> From: Sean Zhong >> Sent: Wednesday, 19 March 2014 15:39 >> To: [email protected] >> >> Hi David, >> >> >> As I said, to get the ack count for spout, you need to add a message Id >> when emitting. Likes this, >> collector.emit(new Values(msg), messageId) >> ^| here >> >> If the message Id is null, then there will be ack message sent to spout. >> >> >> Sean >> >> >> >> On Wed, Mar 19, 2014 at 4:49 PM, David Crossland <[email protected]> >> wrote: >>> >>> I had noticed the ack count never increased for the spout. >>> >>> The spout is a BaseRichSpout, I presumed the underlying code should >>> handle the ack and display appropriate metrics. But you can see from the >>> first bolt that all messages have been asked.. I don't know if this is an >>> issue. >>> >>> Ive been doing a bit of reading around the service bus, if I understand >>> correctly it can push 20 messages per second per connection (I need to >>> confirm this with a colleague who has the experience with this..) If that's >>> the case then it tallies with the throughput I've been seeing (approx…). I >>> suspect the problem isn't so much the topology, but the service bus itself. >>> >>> I'll come back if this doesn't pan out. >>> >>> D >>> >>> From: Sean Zhong >>> Sent: Wednesday, 19 March 2014 00:45 >>> To: [email protected] >>> >>> The Spout is suspicous. >>> >>> From the screenshots, you are using no-acked spout, there may also be GC >>> issue there. >>> Here is the suggestion: >>> >>> Check whether you are making connection in the context of nextTuple. If >>> that is true, it means a large latency. You can check the spout latency by >>> enabling acked spout(when emmit, add a messageId. e.g. emit(new Values(msg), >>> messageId), and check the latency. >>> If this is the case, you need to create a seperate thread for data >>> connection in spout, and use a queue to buffer messages, then nextTuple just >>> pol the queue. >>> >>> >>> >>> On Wed, Mar 19, 2014 at 8:06 AM, Michael Rose <[email protected]> >>> wrote: >>>> >>>> Well, I see you have 30 spouts instances and 3 bolt instances. Doesn't >>>> seem like it's a huge bottleneck, but it is having to send over the wire >>>> much of the time. Something to keep in mind for the future. >>>> >>>> I'd be most suspicious of your spouts. All spouts on a single worker are >>>> run single-threaded (in an event loop calling nextTuple()), so if you have >>>> ANY blocking work that'll kill throughput. If AzureServiceBus is anything >>>> like SQS, response times are not instant. We tend to have background >>>> threads >>>> feeding a queue in our spouts for that reason, as we use SQS for many of >>>> our >>>> topologies. Given 12 workers, you'd have ~3 spouts per machine running >>>> single-threaded. >>>> >>>> With spouts, you should attempt to maintain as little blocking work as >>>> possible...if you're using a queue, you should be using Queue#poll() to >>>> either return a value OR null, and only emit a tuple if an event is >>>> available (and meets your criteria). Storm will handle rate limiting the >>>> spouts with sleeps. >>>> >>>> Michael Rose (@Xorlev) >>>> Senior Platform Engineer, FullContact >>>> [email protected] >>>> >>>> >>>> >>>> On Tue, Mar 18, 2014 at 5:14 PM, David Crossland <[email protected]> >>>> wrote: >>>>> >>>>> Perhaps these screenshots might shed some light? I don't think there is >>>>> much of a latency issue. I'm really starting to suspect there is some >>>>> consumption rate issue from the topic. >>>>> >>>>> I set the spout to a high parallelism value as it did seem to improve >>>>> throughput.. >>>>> >>>>> But if there is anything you can spot that would be grand >>>>> >>>>> Thanks >>>>> David >>>>> >>>>> From: Nathan Leung >>>>> Sent: Tuesday, 18 March 2014 21:14 >>>>> To: [email protected] >>>>> >>>>> It could be bolt 3. What is the latency like between your worker and >>>>> your redis server? Increasing the number of threads for bolt 3 will >>>>> likely >>>>> increase your throughput. Bolt 1 and 2 are probably CPU bound, but bolt 3 >>>>> is probably restricted by your network access. Also I've found that >>>>> localOrShuffleGrouping can improve performance due to reduced network >>>>> communications. >>>>> >>>>> >>>>> On Tue, Mar 18, 2014 at 3:55 PM, David Crossland >>>>> <[email protected]> wrote: >>>>>> >>>>>> A bit more information then >>>>>> >>>>>> There are 4 components >>>>>> >>>>>> Spout - This is reading from an azure service bus topic/subscription. >>>>>> A connection is created in the open() method of the spout, nextTuple >>>>>> does a >>>>>> peek on the message, and invokes the following code; >>>>>> >>>>>> StringWriter writer = new StringWriter(); >>>>>> IOUtils.copy(message.getBody(), writer); >>>>>> String messageBody = writer.toString(); >>>>>> >>>>>> It then deletes the message from the queue. >>>>>> >>>>>> Overall nothing all that exciting.. >>>>>> >>>>>> Bolt 1 - Filtering >>>>>> >>>>>> Parses the message body (json string) and converts it to an object >>>>>> representation. Filters out anything that isn't a monetise message. It >>>>>> then emits the monetise message object to the next bolt. Monetise >>>>>> messages >>>>>> account for ~ 0.03% of the total message volume. >>>>>> >>>>>> Bolt 2 - transformation >>>>>> >>>>>> Basically extracts from the monetise object the values that are >>>>>> interesting and contracts a string which it emits >>>>>> >>>>>> Bolt 3 - Storage >>>>>> >>>>>> Stores the transformed string in Redis using the current date/time as >>>>>> key. >>>>>> >>>>>> ----- >>>>>> >>>>>> Shuffle grouping is used with the topology >>>>>> >>>>>> I ack every tuple irrespective of whether I emit the tuple or not. It >>>>>> should not be attempting to replay tuple. >>>>>> >>>>>> ----- >>>>>> >>>>>> I don't think Bolt 2/3 are the cause of the bottleneck. They don't >>>>>> have to process much data at all tbh. >>>>>> >>>>>> I can accept that perhaps there is something inefficient with the >>>>>> spout, perhaps it just can't read from the service bus quickly enough. I >>>>>> will do some more research on this and have a chat with the colleague who >>>>>> wrote this component. >>>>>> >>>>>> I suppose I'm just trying to identify if I've configured something >>>>>> incorrectly with respect to storm, whether I'm correct to relate the >>>>>> total >>>>>> number of executors and tasks to the total number of cores I have >>>>>> available. >>>>>> I find it strange that I get a better throughput when I choose an >>>>>> arbitrary >>>>>> large number for the parallelism hint than if I constrain myself to a >>>>>> maximum that equates to the number of cores. >>>>>> >>>>>> D >>>>>> >>>>>> From: Nathan Leung >>>>>> Sent: Tuesday, 18 March 2014 18:38 >>>>>> To: [email protected] >>>>>> >>>>>> In my experience storm is able to make good use of CPU resources, if >>>>>> the application is written appropriately. You shouldn't require too much >>>>>> executor parallelism if your application is CPU intensive. If your bolts >>>>>> are doing things like remote DB/NoSQL accesses, then that changes things >>>>>> and >>>>>> parallelizing bolts will give you more throughput. Not knowing your >>>>>> application, the best way to pin down the problem is to simplify your >>>>>> topology. Cut out everything except for the Spout. How is your >>>>>> filtering >>>>>> done? if you return without emitting, the latest versions of storm will >>>>>> sleep before trying again. It may be worthwhile to loop in the spout >>>>>> until >>>>>> you receive a valid message, or the bus is empty. How much throughput >>>>>> can >>>>>> you achieve from the spout, emitting a tuple into the ether? Maybe the >>>>>> problem is your message bus. Once you have achieve a level of >>>>>> performance >>>>>> you are satisfied from the spout, add one bolt. What bottlenecks does >>>>>> the >>>>>> bolt introduce? etc etc. >>>>>> >>>>>> >>>>>> On Tue, Mar 18, 2014 at 2:31 PM, David Crossland >>>>>> <[email protected]> wrote: >>>>>>> >>>>>>> Could my issue relate to memory allocated to the JVM? Most of the >>>>>>> setting are pretty much the defaults. Are there any other settings that >>>>>>> could be throttling the topology? >>>>>>> >>>>>>> I'd like to be able to identify the issue without all this constant >>>>>>> “stabbing in the dark”… ?? >>>>>>> >>>>>>> D >>>>>>> >>>>>>> From: David Crossland >>>>>>> Sent: Tuesday, 18 March 2014 16:32 >>>>>>> To: [email protected] >>>>>>> >>>>>>> Being very new to storm I'm not sure what to expect in some regards. >>>>>>> >>>>>>> Ive been playing about with the number of workers/executors/tasks >>>>>>> trying to improve throughput on my cluster. I have a 3 nodes, two 4 >>>>>>> core >>>>>>> and a 2 core node (I can't increase the 3rd node to a medium until the >>>>>>> customer gets more cores..). There is a spout that reads from a >>>>>>> message bus >>>>>>> and a bolt that filter out all but the messages we are interested in >>>>>>> processing downstream in the topology. Most messages are filtered out. >>>>>>> >>>>>>> I'm assuming that these two components require the most resources as >>>>>>> they should be reading/filtering messages at a constant rate, there are >>>>>>> two >>>>>>> further bolts that are invoked intermittently and hence require less. >>>>>>> >>>>>>> Ive set the number of workers to 12 (in fact I've noticed it rarely >>>>>>> seems to make much difference if I set this to 3/6/9 or 12, there is >>>>>>> marginal improvement the higher the value). >>>>>>> >>>>>>> The spout and filtering bolt I've tried a number of values for in the >>>>>>> parallelism hint (I started with 1/4/8/16/20/30/40/60/128/256… ) and I >>>>>>> can >>>>>>> barely get the throughput to exceed 3500 messages per minute. And the >>>>>>> larger hints just grind the system to a halt. Currently I have them >>>>>>> both >>>>>>> set to 20. >>>>>>> >>>>>>> The strange thing is that no matter what I do the CPU load is very >>>>>>> tiny and is typically 80-90% idle. Suggesting that the topology isn't >>>>>>> doing >>>>>>> that much work. And tbh I've no idea why this is. Can anyone offer any >>>>>>> suggestions? >>>>>>> >>>>>>> If I understand how this should work, given the number of cores I >>>>>>> would think the total number of executors should total 10? Spawning 1 >>>>>>> thread >>>>>>> per node, I can then set a number of tasks to say 2/4/8 per thread >>>>>>> (I've no >>>>>>> idea which would be most efficient..). Ive tried something along these >>>>>>> lines and my throughput was significantly less, approx. 2000 messages >>>>>>> per >>>>>>> minute. In fact parallelism hint of 20/24/30 seem to have produced the >>>>>>> most >>>>>>> throughput. >>>>>>> >>>>>>> All help/suggestions gratefully received! >>>>>>> >>>>>>> Thanks >>>>>>> David >>>>>>> >>>>>> >>>>> >>>> >>> >> > -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.org
