We're sending non-persistent messages to a topic where some consumers may
sometimes be slow.  We've configured a slowConsumerStrategy of
AbortSlowConsumerStrategy to disconnect any consumers that happen to be
slow.  The detection of a slow consumer relies on there being at least 2x
the consumer's pre-fetch buffer size in unprocessed messages: the
consumer's pre-fetch buffer must be full, and the broker must have at
least that many additional messages that haven't yet been dispatched to the
consumer.

I would have expected that if messages expire before the consumer manages
to get around to consuming them, that those messages would count against
the consumer when determining slowness (after all, they're expiring
precisely because the consumer is too slow to get to them before they
expire), but I don't see that happening in my testing with an
intentionally-slow consumer.

Instead, my consumer spends more time processing the first message than the
messages' time to live (configured via Camel's timeToLive URI parameter),
while the other messages time out and are removed from the topic.  As long
as the slow consumer detection thread doesn't run before the messages
finish getting expired, then when it does run, it sees that the
consumer's got a full pre-fetch buffer but the broker doesn't have any
additional pending messages, and it declares the consumer to not be slow,
even though the consumer was so slow that messages expired while it was out
to lunch.

I had expected that ActiveMQ would track whether any messages had been
expired since the last time we checked whether their consumer was slow, and
if so we'd consider the consumer slow irrespective of the number of
unprocessed messages anywhere in the pipeline.  (That would ensure that the
consumer wouldn't be slow forever just because it let some messages expire
long ago.  As long as no more messages expired after the first slow
consumer detection check, the expiredMessagesSinceLastSlowConsumerCheck
flag would be false the next time we checked for slow consumers, and we'd
determine whether the consumer was slow based solely on the number of
unprocessed messages in the pipeline, as we do now.)

Is there a way to make ActiveMQ work the way I expected it would, and I
just haven't found it?  If not, is there a reason it works the way it does
rather than the way I expected it would, other than that my way is harder
to implement and no one's spent the time to implement it?  That is, is
there any reason why we wouldn't want the functionality as I expected it
would work (or at least, the ability to choose between the two approaches,
based on whether you expected non-slow consumers to always process messages
before they expired or not)?

If the functionality I described doesn't exist and is something we'd want
to include, I can submit a JIRA to implement it.

Thanks,
Tim

Reply via email to