Hi David,
As I said, to get the ack count for spout, you need to add a message Id
when emitting. Likes this,
collector.emit(new Values(msg), messageId)
^| here
If the message Id is null, then there will be ack message sent to spout.
Sean
On Wed, Mar 19, 2014 at 4:49 PM, David Crossland <[email protected]>wrote:
> I had noticed the ack count never increased for the spout.
>
> The spout is a BaseRichSpout, I presumed the underlying code should
> handle the ack and display appropriate metrics. But you can see from the
> first bolt that all messages have been asked.. I don't know if this is an
> issue.
>
> Ive been doing a bit of reading around the service bus, if I understand
> correctly it can push 20 messages per second per connection (I need to
> confirm this with a colleague who has the experience with this..) If that's
> the case then it tallies with the throughput I've been seeing (approx…). I
> suspect the problem isn't so much the topology, but the service bus
> itself.
>
> I'll come back if this doesn't pan out.
>
> D
>
> *From:* Sean Zhong <[email protected]>
> *Sent:* Wednesday, 19 March 2014 00:45
> *To:* [email protected]
>
> The Spout is suspicous.
>
> From the screenshots, you are using no-acked spout, there may also be GC
> issue there.
> Here is the suggestion:
>
> Check whether you are making connection in the context of nextTuple. If
> that is true, it means a large latency. You can check the spout latency by
> enabling acked spout(when emmit, add a messageId. e.g. emit(new
> Values(msg), messageId), and check the latency.
> If this is the case, you need to create a seperate thread for data
> connection in spout, and use a queue to buffer messages, then nextTuple
> just pol the queue.
>
>
>
> On Wed, Mar 19, 2014 at 8:06 AM, Michael Rose <[email protected]>wrote:
>
>> Well, I see you have 30 spouts instances and 3 bolt instances. Doesn't
>> seem like it's a huge bottleneck, but it is having to send over the wire
>> much of the time. Something to keep in mind for the future.
>>
>> I'd be most suspicious of your spouts. All spouts on a single worker
>> are run single-threaded (in an event loop calling nextTuple()), so if you
>> have ANY blocking work that'll kill throughput. If AzureServiceBus is
>> anything like SQS, response times are not instant. We tend to have
>> background threads feeding a queue in our spouts for that reason, as we use
>> SQS for many of our topologies. Given 12 workers, you'd have ~3 spouts per
>> machine running single-threaded.
>>
>> With spouts, you should attempt to maintain as little blocking work as
>> possible...if you're using a queue, you should be using Queue#poll() to
>> either return a value OR null, and only emit a tuple if an event is
>> available (and meets your criteria). Storm will handle rate limiting the
>> spouts with sleeps.
>>
>> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>> [email protected]
>>
>>
>> On Tue, Mar 18, 2014 at 5:14 PM, David Crossland
>> <[email protected]>wrote:
>>
>>> Perhaps these screenshots might shed some light? I don't think there
>>> is much of a latency issue. I'm really starting to suspect there is some
>>> consumption rate issue from the topic.
>>>
>>> I set the spout to a high parallelism value as it did seem to improve
>>> throughput..
>>>
>>> But if there is anything you can spot that would be grand
>>>
>>> Thanks
>>> David
>>>
>>> *From:* Nathan Leung <[email protected]>
>>> *Sent:* Tuesday, 18 March 2014 21:14
>>> *To:* [email protected]
>>>
>>> It could be bolt 3. What is the latency like between your worker and
>>> your redis server? Increasing the number of threads for bolt 3 will likely
>>> increase your throughput. Bolt 1 and 2 are probably CPU bound, but bolt 3
>>> is probably restricted by your network access. Also I've found that
>>> localOrShuffleGrouping can improve performance due to reduced network
>>> communications.
>>>
>>>
>>> On Tue, Mar 18, 2014 at 3:55 PM, David Crossland
>>> <[email protected]>wrote:
>>>
>>>> A bit more information then
>>>>
>>>> There are 4 components
>>>>
>>>> Spout - This is reading from an azure service bus
>>>> topic/subscription. A connection is created in the open() method of the
>>>> spout, nextTuple does a peek on the message, and invokes the following
>>>> code;
>>>>
>>>> StringWriter writer = new StringWriter();
>>>> IOUtils.copy(message.getBody(), writer);
>>>> String messageBody = writer.toString();
>>>>
>>>> It then deletes the message from the queue.
>>>>
>>>> Overall nothing all that exciting..
>>>>
>>>> Bolt 1 - Filtering
>>>>
>>>> Parses the message body (json string) and converts it to an object
>>>> representation. Filters out anything that isn't a monetise message. It
>>>> then emits the monetise message object to the next bolt. Monetise messages
>>>> account for ~ 0.03% of the total message volume.
>>>>
>>>> Bolt 2 - transformation
>>>>
>>>> Basically extracts from the monetise object the values that are
>>>> interesting and contracts a string which it emits
>>>>
>>>> Bolt 3 - Storage
>>>>
>>>> Stores the transformed string in Redis using the current date/time as
>>>> key.
>>>>
>>>> -----
>>>>
>>>> Shuffle grouping is used with the topology
>>>>
>>>> I ack every tuple irrespective of whether I emit the tuple or not.
>>>> It should not be attempting to replay tuple.
>>>>
>>>> -----
>>>>
>>>> I don't think Bolt 2/3 are the cause of the bottleneck. They don't
>>>> have to process much data at all tbh.
>>>>
>>>> I can accept that perhaps there is something inefficient with the
>>>> spout, perhaps it just can't read from the service bus quickly enough. I
>>>> will do some more research on this and have a chat with the colleague who
>>>> wrote this component.
>>>>
>>>> I suppose I'm just trying to identify if I've configured something
>>>> incorrectly with respect to storm, whether I'm correct to relate the total
>>>> number of executors and tasks to the total number of cores I have
>>>> available. I find it strange that I get a better throughput when I choose
>>>> an arbitrary large number for the parallelism hint than if I constrain
>>>> myself to a maximum that equates to the number of cores.
>>>>
>>>> D
>>>>
>>>> *From:* Nathan Leung <[email protected]>
>>>> *Sent:* Tuesday, 18 March 2014 18:38
>>>> *To:* [email protected]
>>>>
>>>> In my experience storm is able to make good use of CPU resources, if
>>>> the application is written appropriately. You shouldn't require too much
>>>> executor parallelism if your application is CPU intensive. If your bolts
>>>> are doing things like remote DB/NoSQL accesses, then that changes things
>>>> and parallelizing bolts will give you more throughput. Not knowing your
>>>> application, the best way to pin down the problem is to simplify your
>>>> topology. Cut out everything except for the Spout. How is your filtering
>>>> done? if you return without emitting, the latest versions of storm will
>>>> sleep before trying again. It may be worthwhile to loop in the spout until
>>>> you receive a valid message, or the bus is empty. How much throughput can
>>>> you achieve from the spout, emitting a tuple into the ether? Maybe the
>>>> problem is your message bus. Once you have achieve a level of performance
>>>> you are satisfied from the spout, add one bolt. What bottlenecks does the
>>>> bolt introduce? etc etc.
>>>>
>>>>
>>>> On Tue, Mar 18, 2014 at 2:31 PM, David Crossland <[email protected]
>>>> > wrote:
>>>>
>>>>> Could my issue relate to memory allocated to the JVM? Most of the
>>>>> setting are pretty much the defaults. Are there any other settings that
>>>>> could be throttling the topology?
>>>>>
>>>>> I'd like to be able to identify the issue without all this
>>>>> constant “stabbing in the dark”… 😃
>>>>>
>>>>> D
>>>>>
>>>>> *From:* David Crossland <[email protected]>
>>>>> *Sent:* Tuesday, 18 March 2014 16:32
>>>>> *To:* [email protected]
>>>>>
>>>>> Being very new to storm I'm not sure what to expect in some
>>>>> regards.
>>>>>
>>>>> Ive been playing about with the number of workers/executors/tasks
>>>>> trying to improve throughput on my cluster. I have a 3 nodes, two 4 core
>>>>> and a 2 core node (I can't increase the 3rd node to a medium until the
>>>>> customer gets more cores..). There is a spout that reads from a message
>>>>> bus and a bolt that filter out all but the messages we are interested in
>>>>> processing downstream in the topology. Most messages are filtered out.
>>>>>
>>>>> I'm assuming that these two components require the most resources as
>>>>> they should be reading/filtering messages at a constant rate, there are
>>>>> two
>>>>> further bolts that are invoked intermittently and hence require less.
>>>>>
>>>>> Ive set the number of workers to 12 (in fact I've noticed it rarely
>>>>> seems to make much difference if I set this to 3/6/9 or 12, there is
>>>>> marginal improvement the higher the value).
>>>>>
>>>>> The spout and filtering bolt I've tried a number of values for in
>>>>> the parallelism hint (I started with 1/4/8/16/20/30/40/60/128/256… ) and I
>>>>> can barely get the throughput to exceed 3500 messages per minute. And the
>>>>> larger hints just grind the system to a halt. Currently I have them both
>>>>> set to 20.
>>>>>
>>>>> The strange thing is that no matter what I do the CPU load is very
>>>>> tiny and is typically 80-90% idle. Suggesting that the topology isn't
>>>>> doing that much work. And tbh I've no idea why this is. Can anyone offer
>>>>> any suggestions?
>>>>>
>>>>> If I understand how this should work, given the number of cores I
>>>>> would think the total number of executors should total 10? Spawning 1
>>>>> thread per node, I can then set a number of tasks to say 2/4/8 per thread
>>>>> (I've no idea which would be most efficient..). Ive tried something along
>>>>> these lines and my throughput was significantly less, approx. 2000
>>>>> messages
>>>>> per minute. In fact parallelism hint of 20/24/30 seem to have produced
>>>>> the
>>>>> most throughput.
>>>>>
>>>>> All help/suggestions gratefully received!
>>>>>
>>>>> Thanks
>>>>> David
>>>>>
>>>>>
>>>>
>>>
>>
>