So at work we currently have J2SE processes that use
JMSMessageListeners to consume messages from a Websphere MQ 6.x queue.
At this time I'm enhancing our existing processes to support muliple
JMSMessageListeners where was the original implementation used a
single threaded listener only. ie. one session, one message listener
but the processing of the message was performed all within onMessage
hence the rate of processing the messages is considerably slow.
Right now I've enhanced this by dispatching a message after it arrives
to a thread pool to be processed by a a worker thread. The framework
used to utilize the threadpool gives us a high level of concurrency
where it matters. As opposed to a solution that created N listeners
where each listener was bound to a dedicated queue. The publisher
would then route the messages to the appropriate queue ensuring the
grouping of messages were correct as far as which queue contained
which message.
One of the issues that come up with this scenario is how do you ensure
that you've processed a message that you've removed from the queue so
we used CLIENT_ACKNOWLEDGEMENT and because I can only acknowledge from
the delivery thread what I do is have the worker thread queue up the
acknowledgement and the very last thing the onMessage method does it
work thru this "queue" and acknowledges messages so that they are
removed from the queue.
The key goal being that you don't want to acknowledge a message from
the queue that hasn't been processed yet. I suppose one of the flaws
in this implementation is the concept that when you acknowledge a
message you implicitly acknowledge all unacknowledged messages before
it in the queue so this solution is by no means full proof. So in
otherwords it's not only acknowleding the messages I already know I've
processed but also some that I might not have processed because of
these semantics.
The other approach is to use AUTO_ACKNOWLEDGE and then persist the
state whilst it's been worked and then remove it or mark it deleted
when it's finally been processed. If the consumer should need to be
restarted it can accurately determine what's not been processed but
what was acknowledged from the queue and resubmit those to the worker
threads. This does seem on the surface to be more reliable than the
"queueing up of acknowledgements" approach.
In any case, part of the reason either approach was conceived is the
limitation that n Websphere's JMS MessageListeners attached to the
same QueueSession are all invoked serially and so there's a
performance hit you take there rather than a true concurrent approach
that exists when you have farm of MDB's working for you. In our case
MDB's aren't an option as we aren't using an application server.
How can we use ActiveMQ to increase throughput of processing from our
queues such that we can utilize our existing concurrency frameworks
and still ensure that no messages are lost ie. acknowledge without
having been processed?
If I was to bridge from Websphere Queue to ActiveMQ and write my app
to consume from ActiveMQ how does ActiveMQ allow me to radpidly and
reliably consume messages using JMS apis such that I'm not forced to
do all the work in the delivery thread but I still can accurate keep
track of what's been worked on and what's been processed such that I
can recover the consumer from an outage without lose of messages?
- How can ActiveMQ help with our scenario? Robert Nicholson
-