Re: Using MessageListener to read up to a limit of message?

2015-06-23 Thread Kevin Burton
 If you want to pause message acknowledgement and wait before consuming more
 messages then you should probably use a synchronous consumer instead and
 just call consumer.receive() and not try and use a messageListener which
 is asynchronous.


I have to move away from synchronous because we require so many threads
that even a prefetch of 1 means that our robot has 20-40k of message
waiting to be processed.  It’s definitely a weird use case, I’ll admit.

But it sounds like, as long as I don’t return from onMessage(), then no
more messages will be processed. Perhaps I can just submit it to a queue
and then call await() on a count down latch before continuing forward.

-- 

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
https://plus.google.com/102718274791889610666/posts


Re: Using MessageListener to read up to a limit of message?

2015-06-23 Thread Tim Bain
Waiting to return from onMessage() till its work is completed on the other
thread sounds like it should give you what you want.
On Jun 23, 2015 3:53 PM, Kevin Burton bur...@spinn3r.com wrote:

  If you want to pause message acknowledgement and wait before consuming
 more
  messages then you should probably use a synchronous consumer instead and
  just call consumer.receive() and not try and use a messageListener
 which
  is asynchronous.
 

 I have to move away from synchronous because we require so many threads
 that even a prefetch of 1 means that our robot has 20-40k of message
 waiting to be processed.  It’s definitely a weird use case, I’ll admit.

 But it sounds like, as long as I don’t return from onMessage(), then no
 more messages will be processed. Perhaps I can just submit it to a queue
 and then call await() on a count down latch before continuing forward.

 --

 Founder/CEO Spinn3r.com
 Location: *San Francisco, CA*
 blog: http://burtonator.wordpress.com
 … or check out my Google+ profile
 https://plus.google.com/102718274791889610666/posts



Re: Using MessageListener to read up to a limit of message?

2015-06-22 Thread Christopher Shannon
Yes, Tim you are right, I wasn't thinking when I wrote that response.
Obviously the broker won't know to dispatch more messages to the client's
prefetch buffer until it starts to receive acknowledgements telling the
broker that it is ok to dispatch more.  With auto-acknowledgement this
happens automatically which is probably what I was thinking of when I wrote
my initial response.

Which brings us back to the original message in that if the messages were
truly unacknowledged, then yes the client should stop receiving messages
until the messages are acknowledged or committed.  When a transaction is
used, the acknowledgement mode is ignored.  You use one or the other.

On Mon, Jun 22, 2015 at 9:52 AM, Tim Bain tb...@alumni.duke.edu wrote:

 I had always understood it as Kevin did: that the prefetch size (which is
 computed on the broker) was the number of outstanding, unacknowledged
 messages that the broker had dispatched to the consumer, and that once the
 number of unacknowledged messages equaled the prefetch limit, the broker
 would stop dispatching new messages until acks are used (see paragraph 3 of
 http://activemq.apache.org/what-is-the-prefetch-limit-for.html).
 Christopher, can you point to code or documentation that explains how the
 broker would dispatch more unacknowledged messages than the prefetch buffer
 size?

 Kevin, if the wiki (and your assumption) really is correct about how
 prefetch works, the crucial word here is unacknowledged: your messages
 will be limited by that count only if you're not acknowledging them till
 you're done processing the message.  If you auto-ack the messages when you
 begin processing them, which is the default behavior, you'll keep getting
 new messages while the older ones are being processed.  Your reference to
 commit() implies that you're using transactions and I don't know how that
 plays into the ack mode, but make sure that the messages are not getting
 acked till you're done processing them.  This may play into the other
 thread you've got going, because if you queue the message up for work and
 then ack it before the work occurs, you've broken your ability to limit how
 many you're processing.

 Also, make sure (via JMX) that the prefetch value you think you're setting
 on your subscribers really is what you think you're setting; that could
 easily explain behavior that's not what you're expecting.

 Tim

 On Sun, Jun 21, 2015 at 6:40 PM, Christopher Shannon 
 christopher.l.shan...@gmail.com wrote:

  Right, you will continue to receive messages if you process them.  The
  prefetch size is used as an optimization and doesn't have anything to do
  with message acknowledgement.  The broker will always try and keep your
  prefetch full on your client so you can quickly consume messages. As you
  process messages the broker will dispatch more to you regardless of
 whether
  or not you have called commit or acknowledged the messages.  The broker
  will keep track of all messages until you actually acknowledge them.
 
  If you want to pause message acknowledgement and wait before consuming
 more
  messages then you should probably use a synchronous consumer instead and
  just call consumer.receive() and not try and use a messageListener
 which
  is asynchronous.
 
  On Sun, Jun 21, 2015 at 6:35 PM, Kevin Burton bur...@spinn3r.com
 wrote:
 
   I have a threaded app .. so let’s say I have 100 threads waiting to do
   work.
  
   I want an async message listener to read these messages, UP TO 100
   messages, until I can process and commit() them.
  
   But I don’t think there’s a way to do this.
  
   I had ASSUMED that setting a prefetch of say 10, and a message
 listener,
   would only give me 10 messages, until I acknowledged them.
  
   But this doesn’t seem to be the case. I keep receiving messages on the
   listener.
  
   --
  
   Founder/CEO Spinn3r.com
   Location: *San Francisco, CA*
   blog: http://burtonator.wordpress.com
   … or check out my Google+ profile
   https://plus.google.com/102718274791889610666/posts
  
 



Re: Using MessageListener to read up to a limit of message?

2015-06-22 Thread Tim Bain
I had always understood it as Kevin did: that the prefetch size (which is
computed on the broker) was the number of outstanding, unacknowledged
messages that the broker had dispatched to the consumer, and that once the
number of unacknowledged messages equaled the prefetch limit, the broker
would stop dispatching new messages until acks are used (see paragraph 3 of
http://activemq.apache.org/what-is-the-prefetch-limit-for.html).
Christopher, can you point to code or documentation that explains how the
broker would dispatch more unacknowledged messages than the prefetch buffer
size?

Kevin, if the wiki (and your assumption) really is correct about how
prefetch works, the crucial word here is unacknowledged: your messages
will be limited by that count only if you're not acknowledging them till
you're done processing the message.  If you auto-ack the messages when you
begin processing them, which is the default behavior, you'll keep getting
new messages while the older ones are being processed.  Your reference to
commit() implies that you're using transactions and I don't know how that
plays into the ack mode, but make sure that the messages are not getting
acked till you're done processing them.  This may play into the other
thread you've got going, because if you queue the message up for work and
then ack it before the work occurs, you've broken your ability to limit how
many you're processing.

Also, make sure (via JMX) that the prefetch value you think you're setting
on your subscribers really is what you think you're setting; that could
easily explain behavior that's not what you're expecting.

Tim

On Sun, Jun 21, 2015 at 6:40 PM, Christopher Shannon 
christopher.l.shan...@gmail.com wrote:

 Right, you will continue to receive messages if you process them.  The
 prefetch size is used as an optimization and doesn't have anything to do
 with message acknowledgement.  The broker will always try and keep your
 prefetch full on your client so you can quickly consume messages. As you
 process messages the broker will dispatch more to you regardless of whether
 or not you have called commit or acknowledged the messages.  The broker
 will keep track of all messages until you actually acknowledge them.

 If you want to pause message acknowledgement and wait before consuming more
 messages then you should probably use a synchronous consumer instead and
 just call consumer.receive() and not try and use a messageListener which
 is asynchronous.

 On Sun, Jun 21, 2015 at 6:35 PM, Kevin Burton bur...@spinn3r.com wrote:

  I have a threaded app .. so let’s say I have 100 threads waiting to do
  work.
 
  I want an async message listener to read these messages, UP TO 100
  messages, until I can process and commit() them.
 
  But I don’t think there’s a way to do this.
 
  I had ASSUMED that setting a prefetch of say 10, and a message listener,
  would only give me 10 messages, until I acknowledged them.
 
  But this doesn’t seem to be the case. I keep receiving messages on the
  listener.
 
  --
 
  Founder/CEO Spinn3r.com
  Location: *San Francisco, CA*
  blog: http://burtonator.wordpress.com
  … or check out my Google+ profile
  https://plus.google.com/102718274791889610666/posts
 



Re: Using MessageListener to read up to a limit of message?

2015-06-21 Thread Christopher Shannon
Right, you will continue to receive messages if you process them.  The
prefetch size is used as an optimization and doesn't have anything to do
with message acknowledgement.  The broker will always try and keep your
prefetch full on your client so you can quickly consume messages. As you
process messages the broker will dispatch more to you regardless of whether
or not you have called commit or acknowledged the messages.  The broker
will keep track of all messages until you actually acknowledge them.

If you want to pause message acknowledgement and wait before consuming more
messages then you should probably use a synchronous consumer instead and
just call consumer.receive() and not try and use a messageListener which
is asynchronous.

On Sun, Jun 21, 2015 at 6:35 PM, Kevin Burton bur...@spinn3r.com wrote:

 I have a threaded app .. so let’s say I have 100 threads waiting to do
 work.

 I want an async message listener to read these messages, UP TO 100
 messages, until I can process and commit() them.

 But I don’t think there’s a way to do this.

 I had ASSUMED that setting a prefetch of say 10, and a message listener,
 would only give me 10 messages, until I acknowledged them.

 But this doesn’t seem to be the case. I keep receiving messages on the
 listener.

 --

 Founder/CEO Spinn3r.com
 Location: *San Francisco, CA*
 blog: http://burtonator.wordpress.com
 … or check out my Google+ profile
 https://plus.google.com/102718274791889610666/posts