So at work we currently have J2SE processes that use JMSMessageListeners to consume messages from a Websphere MQ 6.x queue.

At this time I'm enhancing our existing processes to support muliple JMSMessageListeners where was the original implementation used a single threaded listener only. ie. one session, one message listener but the processing of the message was performed all within onMessage hence the rate of processing the messages is considerably slow.

Right now I've enhanced this by dispatching a message after it arrives to a thread pool to be processed by a a worker thread. The framework used to utilize the threadpool gives us a high level of concurrency where it matters. As opposed to a solution that created N listeners where each listener was bound to a dedicated queue. The publisher would then route the messages to the appropriate queue ensuring the grouping of messages were correct as far as which queue contained which message.

One of the issues that come up with this scenario is how do you ensure that you've processed a message that you've removed from the queue so we used CLIENT_ACKNOWLEDGEMENT and because I can only acknowledge from the delivery thread what I do is have the worker thread queue up the acknowledgement and the very last thing the onMessage method does it work thru this "queue" and acknowledges messages so that they are removed from the queue.

The key goal being that you don't want to acknowledge a message from the queue that hasn't been processed yet. I suppose one of the flaws in this implementation is the concept that when you acknowledge a message you implicitly acknowledge all unacknowledged messages before it in the queue so this solution is by no means full proof. So in otherwords it's not only acknowleding the messages I already know I've processed but also some that I might not have processed because of these semantics.

The other approach is to use AUTO_ACKNOWLEDGE and then persist the state whilst it's been worked and then remove it or mark it deleted when it's finally been processed. If the consumer should need to be restarted it can accurately determine what's not been processed but what was acknowledged from the queue and resubmit those to the worker threads. This does seem on the surface to be more reliable than the "queueing up of acknowledgements" approach.

In any case, part of the reason either approach was conceived is the limitation that n Websphere's JMS MessageListeners attached to the same QueueSession are all invoked serially and so there's a performance hit you take there rather than a true concurrent approach that exists when you have farm of MDB's working for you. In our case MDB's aren't an option as we aren't using an application server.

How can we use ActiveMQ to increase throughput of processing from our queues such that we can utilize our existing concurrency frameworks and still ensure that no messages are lost ie. acknowledge without having been processed?

If I was to bridge from Websphere Queue to ActiveMQ and write my app to consume from ActiveMQ how does ActiveMQ allow me to radpidly and reliably consume messages using JMS apis such that I'm not forced to do all the work in the delivery thread but I still can accurate keep track of what's been worked on and what's been processed such that I can recover the consumer from an outage without lose of messages?

Reply via email to