Hello,

I'm planning to send a PR in order to deal with the issue I reported in

https://issues.apache.org/jira/browse/AMQ-6658

I would like some feedback (basically, if it makes sense).

The issue concerns redelivery of messages: it does not preserve order under
certain conditions when prefetch is performed.

I think I found what causes the issue.

There are two active threads which are involved in an apparent race
condition:

ActiveMQConnection[xx]Scheduler
ActiveMQ Session Task

1) A Session Task thread verifies in ActiveMQSessionExecutor.iterate
whether there are messages queued on the session (there are 60 messages
prefetched).
2) It dispatchs the threads through ActiveMQMessageConsumer
3) if unconsumedMessages is not running in
ActiveMQMessageConsumer.dispatch, it enqueues the message in
unconsumedMessages and it is not delivered.

So if ActiveMQ Session Task polls messages several times before
ActiveMQConnection[xx]Scheduler starts unconsumedMessages in
ActiveMQMessageConsumer:1889 (unconsumedMessages.start(), there would be
several items in unconsumedMessages.

The problem arises if ActiveMQ Session Task polls messages several times,
it polls a new message, and then ActiveMQConnection[xx]Scheduler starts
unconsumedMessage. In that case unconsumedMessages is running, so the
message is delivered (but the first message which was enqueued in
unconsumedMessage was the one which should have been delivered first
instead of the one which was currently being considered by the session
thread).

I think the dispatch message has to deliver always the oldest message which
was enqueued in unconsumedMessages.

For example in the dispatch method in ActiveMQMessageConsumer, something
like this should be done:

@Override
public void dispatch(MessageDispatch md)
{
MessageListener listener = this.messageListener.get();
try
{
clearMessagesInProgress();
clearDeliveredList();
synchronized (unconsumedMessages.getMutex())
{
if (!unconsumedMessages.isClosed())
{
if (this.info.isBrowser() || !session.connection.isDuplicate(this,
md.getMessage()))
{
if (listener != null && unconsumedMessages.isRunning())
{
// otherwise I do not preserve the order for
// redelivery
unconsumedMessages.enqueue(md);
*md = unconsumedMessages.dequeueNoWait();*

Thank you in advance for any feedback.

Fabian

On Tue, Apr 18, 2017 at 12:51 AM, Tim Bain <tb...@alumni.duke.edu> wrote:

> I'm not an expert on the dispatch code, but this isn't the behavior I'd
> expect to see, so please submit a bug in JIRA for it so we can get someone
> who knows that code well can fix it (or explain why this is the intended
> behavior, if that's the case).
>
> Tim
>
> On Tue, Apr 11, 2017 at 1:15 PM, Fabian Gonzalez <
> fabian.gonza...@mulesoft.com> wrote:
>
> > Hello,
> >
> > This is my  first message so greetings for all.
> > I am facing a situation  where activemq seems not to respect the order
> for
> > dispatched messages when a redilevery is needed using activemq-client
> > 5.14.3.
> >
> > I am sending 60 messages to a queue with a single consumer, and I've
> > noticed that sometimes when a relivery of the message is needed, as a
> > consequence of rollback, another message from those 60 message is served
> > before the redelivered message. There is no maxRedelivery set.
> >
> > What I notice debugging ActiveMQMessageConsumer is that the following
> > behaviour may occur:
> >
> > - The 60 messages are dispatched in order in:
> >
> > ActiveMQMessageConsumer:1376:
> >
> >     @Override
> >     public void dispatch(MessageDispatch md) {
> >         MessageListener listener = this.messageListener.get();
> >         try {
> >             clearMessagesInProgress();
> >             ...
> >
> > unconsumedMessage is running so the message is sent to the listener.
> >
> > - a rollback is performed and the message is redelivered (with a default
> > delay):
> >
> > ActiveMQMessageConsumer:1305:
> >
> >                         if (redeliveryDelay > 0 &&
> > !unconsumedMessages.isClosed()) {
> >                             // Start up the delivery again a little
> later.
> >                             session.getScheduler().executeAfterDelay(new
> > Runnable() {
> >                                 @Override
> >                                 public void run() {
> >                                     try {
> >                                         if (started.get()) {
> >                                             start();
> >                                         }
> >                                     } catch (JMSException e) {
> >
> > session.connection.onAsyncException(e);
> >                                     }
> >                                 }
> >                             }, redeliveryDelay);
> >                         } else {
> >                             start();
> >                         }
> >
> > Periodically, the messages enqueued in the session are attempted to be
> > consumed (as the unconsumedMessages from the consumer is not running they
> > are not sent to the listener to be consumed and they are enqueued as
> > unconsumedMessages).
> > But if the thread scheduled from redelivery is started when the iteration
> > from the unconsumed messages is being performed, the unconsumedMessages
> is
> > started in:
> >
> >     public void start() throws JMSException {
> >         if (unconsumedMessages.isClosed()) {
> >             return;
> >         }
> >         started.set(true);
> >         unconsumedMessages.start();
> >         session.executor.wakeup();
> >     }
> >
> > and the message that is being considered from session (in the other
> thread)
> > is sent to the listener before the redelivered message, which may be an
> > error in order.
> >
> > Is this the expected behaviour? I expected that the order was mantained
> in
> > this cases.
> >
> > Thanks in advance for your help and clarification
> >
>

Reply via email to