Where is this messageId derived from?

I note there is a tuple.getMessageId() but there does not appear to be an 
overload to emit that accepts this?

There is an overload

emit<http://nathanmarz.github.io/storm/doc/backtype/storm/task/OutputCollector.html#emit(java.lang.String,%20java.util.List)>(java.lang.String
 streamId, java.util.List<java.lang.Object> tuple)

I don't think this is what you are referring to however.

My bolt execute method looks like this;  (sorry if I'm being obtuse, this is 
still very new.. )

public void execute(Tuple tuple) {
        String msg = (String)tuple.getValue(0);
        if(msg == null)
        {
            logger.log(Level.ERROR, "Message is null");
            //acknowledge the tuple has failed and return
            _collector.fail(tuple);
            return;
        }

        MessageProcessor processor = new MessageProcessor();
        EventMessage message = processor.processMessage(msg);

        if(message == null)
        {
            logger.log(Level.DEBUG, "Message did not conform to a known event");
            logger.log(Level.DEBUG, msg);
            //acknowlege the tuple, but dont do anything with it, we dont have 
to emit
            _collector.ack(tuple);
        }

        if(message instanceof MonetiseEvent)
        {
            logger.log(Level.DEBUG, "recieved monetise message");
            _collector.emit(new Values(message));
            _collector.ack(tuple);
        }
    }


D

From: Sean Zhong<mailto:[email protected]>
Sent: ‎Wednesday‎, ‎19‎ ‎March‎ ‎2014 ‎15‎:‎39
To: [email protected]<mailto:[email protected]>

Hi David,


As I said, to get the ack count for spout, you need to add a message Id when 
emitting. Likes this,
collector.emit(new Values(msg), messageId)
                                                     ^| here

If the message Id is null, then there will be ack message sent to spout.


Sean



On Wed, Mar 19, 2014 at 4:49 PM, David Crossland 
<[email protected]<mailto:[email protected]>> wrote:
I had noticed the ack count never increased for the spout.

The spout is a BaseRichSpout, I presumed the underlying code should handle the 
ack and display appropriate metrics. But you can see from the first bolt that 
all messages have been asked.. I don't know if this is an issue.

Ive been doing a bit of reading around the service bus, if I understand 
correctly it can push 20 messages per second per connection (I need to confirm 
this with a colleague who has the experience with this..) If that's the case 
then it tallies with the throughput I've been seeing (approx…).  I suspect the 
problem isn't so much the topology, but the service bus itself.

I'll come back if this doesn't pan out.

D

From: Sean Zhong<mailto:[email protected]>
Sent: ‎Wednesday‎, ‎19‎ ‎March‎ ‎2014 ‎00‎:‎45
To: [email protected]<mailto:[email protected]>

The Spout is suspicous.

>From the screenshots, you are using no-acked spout, there may also be GC issue 
>there.
Here is the suggestion:

Check whether you are making connection in the context of nextTuple. If that is 
true, it means a large latency. You can check the spout latency by enabling 
acked spout(when emmit, add a messageId. e.g. emit(new Values(msg), messageId), 
and check the latency.
If this is the case, you need to create a seperate thread for data connection 
in spout, and use a queue to buffer messages, then nextTuple just pol the queue.



On Wed, Mar 19, 2014 at 8:06 AM, Michael Rose 
<[email protected]<mailto:[email protected]>> wrote:
Well, I see you have 30 spouts instances and 3 bolt instances. Doesn't seem 
like it's a huge bottleneck, but it is having to send over the wire much of the 
time. Something to keep in mind for the future.

I'd be most suspicious of your spouts. All spouts on a single worker are run 
single-threaded (in an event loop calling nextTuple()), so if you have ANY 
blocking work that'll kill throughput. If AzureServiceBus is anything like SQS, 
response times are not instant. We tend to have background threads feeding a 
queue in our spouts for that reason, as we use SQS for many of our topologies. 
Given 12 workers, you'd have ~3 spouts per machine running single-threaded.

With spouts, you should attempt to maintain as little blocking work as 
possible...if you're using a queue, you should be using Queue#poll() to either 
return a value OR null, and only emit a tuple if an event is available (and 
meets your criteria). Storm will handle rate limiting the spouts with sleeps.


Michael Rose (@Xorlev<https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact<http://www.fullcontact.com/>
[email protected]<mailto:[email protected]>


On Tue, Mar 18, 2014 at 5:14 PM, David Crossland 
<[email protected]<mailto:[email protected]>> wrote:
Perhaps these screenshots might shed some light? I don't think there is much of 
a latency issue.  I'm really starting to suspect there is some consumption rate 
issue from the topic.

I set the spout to a high parallelism value as it did seem to improve 
throughput..

But if there is anything you can spot that would be grand

Thanks
David

From: Nathan Leung<mailto:[email protected]>
Sent: ‎Tuesday‎, ‎18‎ ‎March‎ ‎2014 ‎21‎:‎14
To: [email protected]<mailto:[email protected]>

It could be bolt 3.  What is the latency like between your worker and your 
redis server?  Increasing the number of threads for bolt 3 will likely increase 
your throughput.  Bolt 1 and 2 are probably CPU bound, but bolt 3 is probably 
restricted by your network access.  Also I've found that localOrShuffleGrouping 
can improve performance due to reduced network communications.


On Tue, Mar 18, 2014 at 3:55 PM, David Crossland 
<[email protected]<mailto:[email protected]>> wrote:
A bit more information then

There are 4 components

Spout - This is reading from an azure service bus topic/subscription.  A 
connection is created in the open() method of the spout, nextTuple does a peek 
on the message, and invokes the following code;

                StringWriter writer = new StringWriter();
                IOUtils.copy(message.getBody(), writer);
                String messageBody = writer.toString();

It then deletes the message from the queue.

Overall nothing all that exciting..

Bolt 1 - Filtering

Parses the message body (json string) and converts it to an object 
representation.  Filters out anything that isn't a monetise message.  It then 
emits the monetise message object to the next bolt.  Monetise messages account 
for ~ 0.03% of the total message volume.

Bolt 2 - transformation

Basically extracts from the monetise object the values that are interesting and 
contracts a string which it emits

Bolt 3 - Storage

Stores the transformed string in Redis using the current date/time as key.

-----

Shuffle grouping is used with the topology

I ack every tuple irrespective of whether I emit the tuple or not.  It should 
not be attempting to replay tuple.

-----

I don't think Bolt 2/3 are the cause of the bottleneck.  They don't have to 
process much data at all tbh.

I can accept that perhaps there is something inefficient with the spout, 
perhaps it just can't read from the service bus quickly enough. I will do some 
more research on this and have a chat with the colleague who wrote this 
component.

I suppose I'm just trying to identify if I've configured something incorrectly 
with respect to storm, whether I'm correct to relate the total number of 
executors and tasks to the total number of cores I have available.  I find it 
strange that I get a better throughput when I choose an arbitrary large number 
for the parallelism hint than if I constrain myself to a maximum that equates 
to the number of cores.

D

From: Nathan Leung<mailto:[email protected]>
Sent: ‎Tuesday‎, ‎18‎ ‎March‎ ‎2014 ‎18‎:‎38
To: [email protected]<mailto:[email protected]>

In my experience storm is able to make good use of CPU resources, if the 
application is written appropriately.  You shouldn't require too much executor 
parallelism if your application is CPU intensive.  If your bolts are doing 
things like remote DB/NoSQL accesses, then that changes things and 
parallelizing bolts will give you more throughput.  Not knowing your 
application, the best way to pin down the problem is to simplify your topology. 
 Cut out everything except for the Spout.  How is your filtering done?  if you 
return without emitting, the latest versions of storm will sleep before trying 
again.  It may be worthwhile to loop in the spout until you receive a valid 
message, or the bus is empty.  How much throughput can you achieve from the 
spout, emitting a tuple into the ether?  Maybe the problem is your message bus. 
 Once you have achieve a level of performance you are satisfied from the spout, 
add one bolt.  What bottlenecks does the bolt introduce?  etc etc.


On Tue, Mar 18, 2014 at 2:31 PM, David Crossland 
<[email protected]<mailto:[email protected]>> wrote:
Could my issue relate to memory allocated to the JVM? Most of the setting are 
pretty much the defaults.  Are there any other settings that could be 
throttling the topology?

I'd like to be able to identify the issue without all this constant “stabbing 
in the dark”… ??

D

From: David Crossland<mailto:[email protected]>
Sent: ‎Tuesday‎, ‎18‎ ‎March‎ ‎2014 ‎16‎:‎32
To: [email protected]<mailto:[email protected]>

Being very new to storm I'm not sure what to expect in some regards.

Ive been playing about with the number of workers/executors/tasks trying to 
improve throughput on my cluster.  I have a 3 nodes, two 4 core and a 2 core 
node (I can't increase the 3rd node to a medium until the customer gets more 
cores..).  There is a spout that reads from a message bus and a bolt that 
filter out all but the messages we are interested in processing downstream in 
the topology.  Most messages are filtered out.

I'm assuming that these two components require the most resources as they 
should be reading/filtering messages at a constant rate, there are two further 
bolts that are invoked intermittently and hence require less.

Ive set the number of workers to 12 (in fact I've noticed it rarely seems to 
make much difference if I set this to 3/6/9 or 12, there is marginal 
improvement the higher the value).

The spout and filtering bolt I've tried a number of values for in the 
parallelism hint (I started with 1/4/8/16/20/30/40/60/128/256… ) and I can 
barely get the throughput to exceed 3500 messages per minute.  And the larger 
hints just grind the system to a halt.  Currently I have them both set to 20.

The strange thing is that no matter what I do the CPU load is very tiny and is 
typically 80-90% idle.  Suggesting that the topology isn't doing that much 
work.  And tbh I've no idea why this is.  Can anyone offer any suggestions?

If I understand how this should work, given the number of cores I would think 
the total number of executors should total 10? Spawning 1 thread per node, I 
can then set a number of tasks to say 2/4/8 per thread (I've no idea which 
would be most efficient..).  Ive tried something along these lines and my 
throughput was significantly less, approx. 2000 messages per minute.  In fact 
parallelism hint of 20/24/30 seem to have produced the most throughput.

All help/suggestions gratefully received!

Thanks
David






Reply via email to