I think its worth exploring throughput you can achieve using Azure
Service Bus Queue first. Most probably this could be a problem in the
way you access Azure Queues. This [1] may help.

Thanks
Milinda


[1] http://msdn.microsoft.com/en-us/library/windowsazure/hh528527.aspx

On Wed, Mar 19, 2014 at 9:55 PM, Michael Rose <[email protected]> wrote:
> messageId is any unique identifier of the message, such that when ack is
> called on your spout you're returned the identifier to then mark the work as
> complete in the source in the case it supports replay.
>
> Michael Rose (@Xorlev)
> Senior Platform Engineer, FullContact
> [email protected]
>
>
>
> On Wed, Mar 19, 2014 at 10:18 AM, David Crossland <[email protected]>
> wrote:
>>
>> Where is this messageId derived from?
>>
>> I note there is a tuple.getMessageId() but there does not appear to be an
>> overload to emit that accepts this?
>>
>> There is an overload
>>
>> emit(java.lang.String streamId, java.util.List<java.lang.Object> tuple)
>>
>> I don't think this is what you are referring to however.
>>
>> My bolt execute method looks like this;  (sorry if I'm being obtuse, this
>> is still very new.. )
>>
>> public void execute(Tuple tuple) {
>>         String msg = (String)tuple.getValue(0);
>>         if(msg == null)
>>         {
>>             logger.log(Level.ERROR, "Message is null");
>>             //acknowledge the tuple has failed and return
>>             _collector.fail(tuple);
>>             return;
>>         }
>>
>>         MessageProcessor processor = new MessageProcessor();
>>         EventMessage message = processor.processMessage(msg);
>>
>>         if(message == null)
>>         {
>>             logger.log(Level.DEBUG, "Message did not conform to a known
>> event");
>>             logger.log(Level.DEBUG, msg);
>>             //acknowlege the tuple, but dont do anything with it, we dont
>> have to emit
>>             _collector.ack(tuple);
>>         }
>>
>>         if(message instanceof MonetiseEvent)
>>         {
>>             logger.log(Level.DEBUG, "recieved monetise message");
>>             _collector.emit(new Values(message));
>>             _collector.ack(tuple);
>>         }
>>     }
>>
>>
>> D
>>
>> From: Sean Zhong
>> Sent: ‎Wednesday‎, ‎19‎ ‎March‎ ‎2014 ‎15‎:‎39
>> To: [email protected]
>>
>> Hi David,
>>
>>
>> As I said, to get the ack count for spout, you need to add a message Id
>> when emitting. Likes this,
>> collector.emit(new Values(msg), messageId)
>>                                                      ^| here
>>
>> If the message Id is null, then there will be ack message sent to spout.
>>
>>
>> Sean
>>
>>
>>
>> On Wed, Mar 19, 2014 at 4:49 PM, David Crossland <[email protected]>
>> wrote:
>>>
>>> I had noticed the ack count never increased for the spout.
>>>
>>> The spout is a BaseRichSpout, I presumed the underlying code should
>>> handle the ack and display appropriate metrics. But you can see from the
>>> first bolt that all messages have been asked.. I don't know if this is an
>>> issue.
>>>
>>> Ive been doing a bit of reading around the service bus, if I understand
>>> correctly it can push 20 messages per second per connection (I need to
>>> confirm this with a colleague who has the experience with this..) If that's
>>> the case then it tallies with the throughput I've been seeing (approx…).  I
>>> suspect the problem isn't so much the topology, but the service bus itself.
>>>
>>> I'll come back if this doesn't pan out.
>>>
>>> D
>>>
>>> From: Sean Zhong
>>> Sent: ‎Wednesday‎, ‎19‎ ‎March‎ ‎2014 ‎00‎:‎45
>>> To: [email protected]
>>>
>>> The Spout is suspicous.
>>>
>>> From the screenshots, you are using no-acked spout, there may also be GC
>>> issue there.
>>> Here is the suggestion:
>>>
>>> Check whether you are making connection in the context of nextTuple. If
>>> that is true, it means a large latency. You can check the spout latency by
>>> enabling acked spout(when emmit, add a messageId. e.g. emit(new Values(msg),
>>> messageId), and check the latency.
>>> If this is the case, you need to create a seperate thread for data
>>> connection in spout, and use a queue to buffer messages, then nextTuple just
>>> pol the queue.
>>>
>>>
>>>
>>> On Wed, Mar 19, 2014 at 8:06 AM, Michael Rose <[email protected]>
>>> wrote:
>>>>
>>>> Well, I see you have 30 spouts instances and 3 bolt instances. Doesn't
>>>> seem like it's a huge bottleneck, but it is having to send over the wire
>>>> much of the time. Something to keep in mind for the future.
>>>>
>>>> I'd be most suspicious of your spouts. All spouts on a single worker are
>>>> run single-threaded (in an event loop calling nextTuple()), so if you have
>>>> ANY blocking work that'll kill throughput. If AzureServiceBus is anything
>>>> like SQS, response times are not instant. We tend to have background 
>>>> threads
>>>> feeding a queue in our spouts for that reason, as we use SQS for many of 
>>>> our
>>>> topologies. Given 12 workers, you'd have ~3 spouts per machine running
>>>> single-threaded.
>>>>
>>>> With spouts, you should attempt to maintain as little blocking work as
>>>> possible...if you're using a queue, you should be using Queue#poll() to
>>>> either return a value OR null, and only emit a tuple if an event is
>>>> available (and meets your criteria). Storm will handle rate limiting the
>>>> spouts with sleeps.
>>>>
>>>> Michael Rose (@Xorlev)
>>>> Senior Platform Engineer, FullContact
>>>> [email protected]
>>>>
>>>>
>>>>
>>>> On Tue, Mar 18, 2014 at 5:14 PM, David Crossland <[email protected]>
>>>> wrote:
>>>>>
>>>>> Perhaps these screenshots might shed some light? I don't think there is
>>>>> much of a latency issue.  I'm really starting to suspect there is some
>>>>> consumption rate issue from the topic.
>>>>>
>>>>> I set the spout to a high parallelism value as it did seem to improve
>>>>> throughput..
>>>>>
>>>>> But if there is anything you can spot that would be grand
>>>>>
>>>>> Thanks
>>>>> David
>>>>>
>>>>> From: Nathan Leung
>>>>> Sent: ‎Tuesday‎, ‎18‎ ‎March‎ ‎2014 ‎21‎:‎14
>>>>> To: [email protected]
>>>>>
>>>>> It could be bolt 3.  What is the latency like between your worker and
>>>>> your redis server?  Increasing the number of threads for bolt 3 will 
>>>>> likely
>>>>> increase your throughput.  Bolt 1 and 2 are probably CPU bound, but bolt 3
>>>>> is probably restricted by your network access.  Also I've found that
>>>>> localOrShuffleGrouping can improve performance due to reduced network
>>>>> communications.
>>>>>
>>>>>
>>>>> On Tue, Mar 18, 2014 at 3:55 PM, David Crossland
>>>>> <[email protected]> wrote:
>>>>>>
>>>>>> A bit more information then
>>>>>>
>>>>>> There are 4 components
>>>>>>
>>>>>> Spout - This is reading from an azure service bus topic/subscription.
>>>>>> A connection is created in the open() method of the spout, nextTuple 
>>>>>> does a
>>>>>> peek on the message, and invokes the following code;
>>>>>>
>>>>>>                 StringWriter writer = new StringWriter();
>>>>>>                 IOUtils.copy(message.getBody(), writer);
>>>>>>                 String messageBody = writer.toString();
>>>>>>
>>>>>> It then deletes the message from the queue.
>>>>>>
>>>>>> Overall nothing all that exciting..
>>>>>>
>>>>>> Bolt 1 - Filtering
>>>>>>
>>>>>> Parses the message body (json string) and converts it to an object
>>>>>> representation.  Filters out anything that isn't a monetise message.  It
>>>>>> then emits the monetise message object to the next bolt.  Monetise 
>>>>>> messages
>>>>>> account for ~ 0.03% of the total message volume.
>>>>>>
>>>>>> Bolt 2 - transformation
>>>>>>
>>>>>> Basically extracts from the monetise object the values that are
>>>>>> interesting and contracts a string which it emits
>>>>>>
>>>>>> Bolt 3 - Storage
>>>>>>
>>>>>> Stores the transformed string in Redis using the current date/time as
>>>>>> key.
>>>>>>
>>>>>> -----
>>>>>>
>>>>>> Shuffle grouping is used with the topology
>>>>>>
>>>>>> I ack every tuple irrespective of whether I emit the tuple or not.  It
>>>>>> should not be attempting to replay tuple.
>>>>>>
>>>>>> -----
>>>>>>
>>>>>> I don't think Bolt 2/3 are the cause of the bottleneck.  They don't
>>>>>> have to process much data at all tbh.
>>>>>>
>>>>>> I can accept that perhaps there is something inefficient with the
>>>>>> spout, perhaps it just can't read from the service bus quickly enough. I
>>>>>> will do some more research on this and have a chat with the colleague who
>>>>>> wrote this component.
>>>>>>
>>>>>> I suppose I'm just trying to identify if I've configured something
>>>>>> incorrectly with respect to storm, whether I'm correct to relate the 
>>>>>> total
>>>>>> number of executors and tasks to the total number of cores I have 
>>>>>> available.
>>>>>> I find it strange that I get a better throughput when I choose an 
>>>>>> arbitrary
>>>>>> large number for the parallelism hint than if I constrain myself to a
>>>>>> maximum that equates to the number of cores.
>>>>>>
>>>>>> D
>>>>>>
>>>>>> From: Nathan Leung
>>>>>> Sent: ‎Tuesday‎, ‎18‎ ‎March‎ ‎2014 ‎18‎:‎38
>>>>>> To: [email protected]
>>>>>>
>>>>>> In my experience storm is able to make good use of CPU resources, if
>>>>>> the application is written appropriately.  You shouldn't require too much
>>>>>> executor parallelism if your application is CPU intensive.  If your bolts
>>>>>> are doing things like remote DB/NoSQL accesses, then that changes things 
>>>>>> and
>>>>>> parallelizing bolts will give you more throughput.  Not knowing your
>>>>>> application, the best way to pin down the problem is to simplify your
>>>>>> topology.  Cut out everything except for the Spout.  How is your 
>>>>>> filtering
>>>>>> done?  if you return without emitting, the latest versions of storm will
>>>>>> sleep before trying again.  It may be worthwhile to loop in the spout 
>>>>>> until
>>>>>> you receive a valid message, or the bus is empty.  How much throughput 
>>>>>> can
>>>>>> you achieve from the spout, emitting a tuple into the ether?  Maybe the
>>>>>> problem is your message bus.  Once you have achieve a level of 
>>>>>> performance
>>>>>> you are satisfied from the spout, add one bolt.  What bottlenecks does 
>>>>>> the
>>>>>> bolt introduce?  etc etc.
>>>>>>
>>>>>>
>>>>>> On Tue, Mar 18, 2014 at 2:31 PM, David Crossland
>>>>>> <[email protected]> wrote:
>>>>>>>
>>>>>>> Could my issue relate to memory allocated to the JVM? Most of the
>>>>>>> setting are pretty much the defaults.  Are there any other settings that
>>>>>>> could be throttling the topology?
>>>>>>>
>>>>>>> I'd like to be able to identify the issue without all this constant
>>>>>>> “stabbing in the dark”… 😃
>>>>>>>
>>>>>>> D
>>>>>>>
>>>>>>> From: David Crossland
>>>>>>> Sent: ‎Tuesday‎, ‎18‎ ‎March‎ ‎2014 ‎16‎:‎32
>>>>>>> To: [email protected]
>>>>>>>
>>>>>>> Being very new to storm I'm not sure what to expect in some regards.
>>>>>>>
>>>>>>> Ive been playing about with the number of workers/executors/tasks
>>>>>>> trying to improve throughput on my cluster.  I have a 3 nodes, two 4 
>>>>>>> core
>>>>>>> and a 2 core node (I can't increase the 3rd node to a medium until the
>>>>>>> customer gets more cores..).  There is a spout that reads from a 
>>>>>>> message bus
>>>>>>> and a bolt that filter out all but the messages we are interested in
>>>>>>> processing downstream in the topology.  Most messages are filtered out.
>>>>>>>
>>>>>>> I'm assuming that these two components require the most resources as
>>>>>>> they should be reading/filtering messages at a constant rate, there are 
>>>>>>> two
>>>>>>> further bolts that are invoked intermittently and hence require less.
>>>>>>>
>>>>>>> Ive set the number of workers to 12 (in fact I've noticed it rarely
>>>>>>> seems to make much difference if I set this to 3/6/9 or 12, there is
>>>>>>> marginal improvement the higher the value).
>>>>>>>
>>>>>>> The spout and filtering bolt I've tried a number of values for in the
>>>>>>> parallelism hint (I started with 1/4/8/16/20/30/40/60/128/256… ) and I 
>>>>>>> can
>>>>>>> barely get the throughput to exceed 3500 messages per minute.  And the
>>>>>>> larger hints just grind the system to a halt.  Currently I have them 
>>>>>>> both
>>>>>>> set to 20.
>>>>>>>
>>>>>>> The strange thing is that no matter what I do the CPU load is very
>>>>>>> tiny and is typically 80-90% idle.  Suggesting that the topology isn't 
>>>>>>> doing
>>>>>>> that much work.  And tbh I've no idea why this is.  Can anyone offer any
>>>>>>> suggestions?
>>>>>>>
>>>>>>> If I understand how this should work, given the number of cores I
>>>>>>> would think the total number of executors should total 10? Spawning 1 
>>>>>>> thread
>>>>>>> per node, I can then set a number of tasks to say 2/4/8 per thread 
>>>>>>> (I've no
>>>>>>> idea which would be most efficient..).  Ive tried something along these
>>>>>>> lines and my throughput was significantly less, approx. 2000 messages 
>>>>>>> per
>>>>>>> minute.  In fact parallelism hint of 20/24/30 seem to have produced the 
>>>>>>> most
>>>>>>> throughput.
>>>>>>>
>>>>>>> All help/suggestions gratefully received!
>>>>>>>
>>>>>>> Thanks
>>>>>>> David
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>



-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Reply via email to