I think its worth exploring throughput you can achieve using Azure Service Bus Queue first. Most probably this could be a problem in the way you access Azure Queues. This [1] may help.
Thanks Milinda [1] http://msdn.microsoft.com/en-us/library/windowsazure/hh528527.aspx On Wed, Mar 19, 2014 at 9:55 PM, Michael Rose <[email protected]> wrote: > messageId is any unique identifier of the message, such that when ack is > called on your spout you're returned the identifier to then mark the work as > complete in the source in the case it supports replay. > > Michael Rose (@Xorlev) > Senior Platform Engineer, FullContact > [email protected] > > > > On Wed, Mar 19, 2014 at 10:18 AM, David Crossland <[email protected]> > wrote: >> >> Where is this messageId derived from? >> >> I note there is a tuple.getMessageId() but there does not appear to be an >> overload to emit that accepts this? >> >> There is an overload >> >> emit(java.lang.String streamId, java.util.List<java.lang.Object> tuple) >> >> I don't think this is what you are referring to however. >> >> My bolt execute method looks like this; (sorry if I'm being obtuse, this >> is still very new.. ) >> >> public void execute(Tuple tuple) { >> String msg = (String)tuple.getValue(0); >> if(msg == null) >> { >> logger.log(Level.ERROR, "Message is null"); >> //acknowledge the tuple has failed and return >> _collector.fail(tuple); >> return; >> } >> >> MessageProcessor processor = new MessageProcessor(); >> EventMessage message = processor.processMessage(msg); >> >> if(message == null) >> { >> logger.log(Level.DEBUG, "Message did not conform to a known >> event"); >> logger.log(Level.DEBUG, msg); >> //acknowlege the tuple, but dont do anything with it, we dont >> have to emit >> _collector.ack(tuple); >> } >> >> if(message instanceof MonetiseEvent) >> { >> logger.log(Level.DEBUG, "recieved monetise message"); >> _collector.emit(new Values(message)); >> _collector.ack(tuple); >> } >> } >> >> >> D >> >> From: Sean Zhong >> Sent: Wednesday, 19 March 2014 15:39 >> To: [email protected] >> >> Hi David, >> >> >> As I said, to get the ack count for spout, you need to add a message Id >> when emitting. Likes this, >> collector.emit(new Values(msg), messageId) >> ^| here >> >> If the message Id is null, then there will be ack message sent to spout. >> >> >> Sean >> >> >> >> On Wed, Mar 19, 2014 at 4:49 PM, David Crossland <[email protected]> >> wrote: >>> >>> I had noticed the ack count never increased for the spout. >>> >>> The spout is a BaseRichSpout, I presumed the underlying code should >>> handle the ack and display appropriate metrics. But you can see from the >>> first bolt that all messages have been asked.. I don't know if this is an >>> issue. >>> >>> Ive been doing a bit of reading around the service bus, if I understand >>> correctly it can push 20 messages per second per connection (I need to >>> confirm this with a colleague who has the experience with this..) If that's >>> the case then it tallies with the throughput I've been seeing (approx…). I >>> suspect the problem isn't so much the topology, but the service bus itself. >>> >>> I'll come back if this doesn't pan out. >>> >>> D >>> >>> From: Sean Zhong >>> Sent: Wednesday, 19 March 2014 00:45 >>> To: [email protected] >>> >>> The Spout is suspicous. >>> >>> From the screenshots, you are using no-acked spout, there may also be GC >>> issue there. >>> Here is the suggestion: >>> >>> Check whether you are making connection in the context of nextTuple. If >>> that is true, it means a large latency. You can check the spout latency by >>> enabling acked spout(when emmit, add a messageId. e.g. emit(new Values(msg), >>> messageId), and check the latency. >>> If this is the case, you need to create a seperate thread for data >>> connection in spout, and use a queue to buffer messages, then nextTuple just >>> pol the queue. >>> >>> >>> >>> On Wed, Mar 19, 2014 at 8:06 AM, Michael Rose <[email protected]> >>> wrote: >>>> >>>> Well, I see you have 30 spouts instances and 3 bolt instances. Doesn't >>>> seem like it's a huge bottleneck, but it is having to send over the wire >>>> much of the time. Something to keep in mind for the future. >>>> >>>> I'd be most suspicious of your spouts. All spouts on a single worker are >>>> run single-threaded (in an event loop calling nextTuple()), so if you have >>>> ANY blocking work that'll kill throughput. If AzureServiceBus is anything >>>> like SQS, response times are not instant. We tend to have background >>>> threads >>>> feeding a queue in our spouts for that reason, as we use SQS for many of >>>> our >>>> topologies. Given 12 workers, you'd have ~3 spouts per machine running >>>> single-threaded. >>>> >>>> With spouts, you should attempt to maintain as little blocking work as >>>> possible...if you're using a queue, you should be using Queue#poll() to >>>> either return a value OR null, and only emit a tuple if an event is >>>> available (and meets your criteria). Storm will handle rate limiting the >>>> spouts with sleeps. >>>> >>>> Michael Rose (@Xorlev) >>>> Senior Platform Engineer, FullContact >>>> [email protected] >>>> >>>> >>>> >>>> On Tue, Mar 18, 2014 at 5:14 PM, David Crossland <[email protected]> >>>> wrote: >>>>> >>>>> Perhaps these screenshots might shed some light? I don't think there is >>>>> much of a latency issue. I'm really starting to suspect there is some >>>>> consumption rate issue from the topic. >>>>> >>>>> I set the spout to a high parallelism value as it did seem to improve >>>>> throughput.. >>>>> >>>>> But if there is anything you can spot that would be grand >>>>> >>>>> Thanks >>>>> David >>>>> >>>>> From: Nathan Leung >>>>> Sent: Tuesday, 18 March 2014 21:14 >>>>> To: [email protected] >>>>> >>>>> It could be bolt 3. What is the latency like between your worker and >>>>> your redis server? Increasing the number of threads for bolt 3 will >>>>> likely >>>>> increase your throughput. Bolt 1 and 2 are probably CPU bound, but bolt 3 >>>>> is probably restricted by your network access. Also I've found that >>>>> localOrShuffleGrouping can improve performance due to reduced network >>>>> communications. >>>>> >>>>> >>>>> On Tue, Mar 18, 2014 at 3:55 PM, David Crossland >>>>> <[email protected]> wrote: >>>>>> >>>>>> A bit more information then >>>>>> >>>>>> There are 4 components >>>>>> >>>>>> Spout - This is reading from an azure service bus topic/subscription. >>>>>> A connection is created in the open() method of the spout, nextTuple >>>>>> does a >>>>>> peek on the message, and invokes the following code; >>>>>> >>>>>> StringWriter writer = new StringWriter(); >>>>>> IOUtils.copy(message.getBody(), writer); >>>>>> String messageBody = writer.toString(); >>>>>> >>>>>> It then deletes the message from the queue. >>>>>> >>>>>> Overall nothing all that exciting.. >>>>>> >>>>>> Bolt 1 - Filtering >>>>>> >>>>>> Parses the message body (json string) and converts it to an object >>>>>> representation. Filters out anything that isn't a monetise message. It >>>>>> then emits the monetise message object to the next bolt. Monetise >>>>>> messages >>>>>> account for ~ 0.03% of the total message volume. >>>>>> >>>>>> Bolt 2 - transformation >>>>>> >>>>>> Basically extracts from the monetise object the values that are >>>>>> interesting and contracts a string which it emits >>>>>> >>>>>> Bolt 3 - Storage >>>>>> >>>>>> Stores the transformed string in Redis using the current date/time as >>>>>> key. >>>>>> >>>>>> ----- >>>>>> >>>>>> Shuffle grouping is used with the topology >>>>>> >>>>>> I ack every tuple irrespective of whether I emit the tuple or not. It >>>>>> should not be attempting to replay tuple. >>>>>> >>>>>> ----- >>>>>> >>>>>> I don't think Bolt 2/3 are the cause of the bottleneck. They don't >>>>>> have to process much data at all tbh. >>>>>> >>>>>> I can accept that perhaps there is something inefficient with the >>>>>> spout, perhaps it just can't read from the service bus quickly enough. I >>>>>> will do some more research on this and have a chat with the colleague who >>>>>> wrote this component. >>>>>> >>>>>> I suppose I'm just trying to identify if I've configured something >>>>>> incorrectly with respect to storm, whether I'm correct to relate the >>>>>> total >>>>>> number of executors and tasks to the total number of cores I have >>>>>> available. >>>>>> I find it strange that I get a better throughput when I choose an >>>>>> arbitrary >>>>>> large number for the parallelism hint than if I constrain myself to a >>>>>> maximum that equates to the number of cores. >>>>>> >>>>>> D >>>>>> >>>>>> From: Nathan Leung >>>>>> Sent: Tuesday, 18 March 2014 18:38 >>>>>> To: [email protected] >>>>>> >>>>>> In my experience storm is able to make good use of CPU resources, if >>>>>> the application is written appropriately. You shouldn't require too much >>>>>> executor parallelism if your application is CPU intensive. If your bolts >>>>>> are doing things like remote DB/NoSQL accesses, then that changes things >>>>>> and >>>>>> parallelizing bolts will give you more throughput. Not knowing your >>>>>> application, the best way to pin down the problem is to simplify your >>>>>> topology. Cut out everything except for the Spout. How is your >>>>>> filtering >>>>>> done? if you return without emitting, the latest versions of storm will >>>>>> sleep before trying again. It may be worthwhile to loop in the spout >>>>>> until >>>>>> you receive a valid message, or the bus is empty. How much throughput >>>>>> can >>>>>> you achieve from the spout, emitting a tuple into the ether? Maybe the >>>>>> problem is your message bus. Once you have achieve a level of >>>>>> performance >>>>>> you are satisfied from the spout, add one bolt. What bottlenecks does >>>>>> the >>>>>> bolt introduce? etc etc. >>>>>> >>>>>> >>>>>> On Tue, Mar 18, 2014 at 2:31 PM, David Crossland >>>>>> <[email protected]> wrote: >>>>>>> >>>>>>> Could my issue relate to memory allocated to the JVM? Most of the >>>>>>> setting are pretty much the defaults. Are there any other settings that >>>>>>> could be throttling the topology? >>>>>>> >>>>>>> I'd like to be able to identify the issue without all this constant >>>>>>> “stabbing in the dark”… 😃 >>>>>>> >>>>>>> D >>>>>>> >>>>>>> From: David Crossland >>>>>>> Sent: Tuesday, 18 March 2014 16:32 >>>>>>> To: [email protected] >>>>>>> >>>>>>> Being very new to storm I'm not sure what to expect in some regards. >>>>>>> >>>>>>> Ive been playing about with the number of workers/executors/tasks >>>>>>> trying to improve throughput on my cluster. I have a 3 nodes, two 4 >>>>>>> core >>>>>>> and a 2 core node (I can't increase the 3rd node to a medium until the >>>>>>> customer gets more cores..). There is a spout that reads from a >>>>>>> message bus >>>>>>> and a bolt that filter out all but the messages we are interested in >>>>>>> processing downstream in the topology. Most messages are filtered out. >>>>>>> >>>>>>> I'm assuming that these two components require the most resources as >>>>>>> they should be reading/filtering messages at a constant rate, there are >>>>>>> two >>>>>>> further bolts that are invoked intermittently and hence require less. >>>>>>> >>>>>>> Ive set the number of workers to 12 (in fact I've noticed it rarely >>>>>>> seems to make much difference if I set this to 3/6/9 or 12, there is >>>>>>> marginal improvement the higher the value). >>>>>>> >>>>>>> The spout and filtering bolt I've tried a number of values for in the >>>>>>> parallelism hint (I started with 1/4/8/16/20/30/40/60/128/256… ) and I >>>>>>> can >>>>>>> barely get the throughput to exceed 3500 messages per minute. And the >>>>>>> larger hints just grind the system to a halt. Currently I have them >>>>>>> both >>>>>>> set to 20. >>>>>>> >>>>>>> The strange thing is that no matter what I do the CPU load is very >>>>>>> tiny and is typically 80-90% idle. Suggesting that the topology isn't >>>>>>> doing >>>>>>> that much work. And tbh I've no idea why this is. Can anyone offer any >>>>>>> suggestions? >>>>>>> >>>>>>> If I understand how this should work, given the number of cores I >>>>>>> would think the total number of executors should total 10? Spawning 1 >>>>>>> thread >>>>>>> per node, I can then set a number of tasks to say 2/4/8 per thread >>>>>>> (I've no >>>>>>> idea which would be most efficient..). Ive tried something along these >>>>>>> lines and my throughput was significantly less, approx. 2000 messages >>>>>>> per >>>>>>> minute. In fact parallelism hint of 20/24/30 seem to have produced the >>>>>>> most >>>>>>> throughput. >>>>>>> >>>>>>> All help/suggestions gratefully received! >>>>>>> >>>>>>> Thanks >>>>>>> David >>>>>>> >>>>>> >>>>> >>>> >>> >> > -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.org
