Long message - I think what you need is
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE as your acknowledgement mode -
its ActiveMQ specific - and only in 5.3 - but please try a snapshot.
The GA of 5.3 should happen in the near future.
On 10 Sep 2009, at 07:06, Robert Nicholson wrote:
So at work we currently have J2SE processes that use
JMSMessageListeners to consume messages from a Websphere MQ 6.x queue.
At this time I'm enhancing our existing processes to support muliple
JMSMessageListeners where was the original implementation used a
single threaded listener only. ie. one session, one message listener
but the processing of the message was performed all within onMessage
hence the rate of processing the messages is considerably slow.
Right now I've enhanced this by dispatching a message after it
arrives to a thread pool to be processed by a a worker thread. The
framework used to utilize the threadpool gives us a high level of
concurrency where it matters. As opposed to a solution that created
N listeners where each listener was bound to a dedicated queue. The
publisher would then route the messages to the appropriate queue
ensuring the grouping of messages were correct as far as which queue
contained which message.
One of the issues that come up with this scenario is how do you
ensure that you've processed a message that you've removed from the
queue so we used CLIENT_ACKNOWLEDGEMENT and because I can only
acknowledge from the delivery thread what I do is have the worker
thread queue up the acknowledgement and the very last thing the
onMessage method does it work thru this "queue" and acknowledges
messages so that they are removed from the queue.
The key goal being that you don't want to acknowledge a message from
the queue that hasn't been processed yet. I suppose one of the flaws
in this implementation is the concept that when you acknowledge a
message you implicitly acknowledge all unacknowledged messages
before it in the queue so this solution is by no means full proof.
So in otherwords it's not only acknowleding the messages I already
know I've processed but also some that I might not have processed
because of these semantics.
The other approach is to use AUTO_ACKNOWLEDGE and then persist the
state whilst it's been worked and then remove it or mark it deleted
when it's finally been processed. If the consumer should need to be
restarted it can accurately determine what's not been processed but
what was acknowledged from the queue and resubmit those to the
worker threads. This does seem on the surface to be more reliable
than the "queueing up of acknowledgements" approach.
In any case, part of the reason either approach was conceived is the
limitation that n Websphere's JMS MessageListeners attached to the
same QueueSession are all invoked serially and so there's a
performance hit you take there rather than a true concurrent
approach that exists when you have farm of MDB's working for you. In
our case MDB's aren't an option as we aren't using an application
server.
How can we use ActiveMQ to increase throughput of processing from
our queues such that we can utilize our existing concurrency
frameworks and still ensure that no messages are lost ie.
acknowledge without having been processed?
If I was to bridge from Websphere Queue to ActiveMQ and write my app
to consume from ActiveMQ how does ActiveMQ allow me to radpidly and
reliably consume messages using JMS apis such that I'm not forced to
do all the work in the delivery thread but I still can accurate keep
track of what's been worked on and what's been processed such that I
can recover the consumer from an outage without lose of messages?
Rob Davies
I work here: http://fusesource.com
My Blog: http://rajdavies.blogspot.com/
I'm writing this: http://www.manning.com/snyder/