Re: Using MessageListener to read up to a limit of message?
If you want to pause message acknowledgement and wait before consuming more messages then you should probably use a synchronous consumer instead and just call consumer.receive() and not try and use a messageListener which is asynchronous. I have to move away from synchronous because we require so many threads that even a prefetch of 1 means that our robot has 20-40k of message waiting to be processed. It’s definitely a weird use case, I’ll admit. But it sounds like, as long as I don’t return from onMessage(), then no more messages will be processed. Perhaps I can just submit it to a queue and then call await() on a count down latch before continuing forward. -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts
Re: Using MessageListener to read up to a limit of message?
Waiting to return from onMessage() till its work is completed on the other thread sounds like it should give you what you want. On Jun 23, 2015 3:53 PM, Kevin Burton bur...@spinn3r.com wrote: If you want to pause message acknowledgement and wait before consuming more messages then you should probably use a synchronous consumer instead and just call consumer.receive() and not try and use a messageListener which is asynchronous. I have to move away from synchronous because we require so many threads that even a prefetch of 1 means that our robot has 20-40k of message waiting to be processed. It’s definitely a weird use case, I’ll admit. But it sounds like, as long as I don’t return from onMessage(), then no more messages will be processed. Perhaps I can just submit it to a queue and then call await() on a count down latch before continuing forward. -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts
Re: Using MessageListener to read up to a limit of message?
Yes, Tim you are right, I wasn't thinking when I wrote that response. Obviously the broker won't know to dispatch more messages to the client's prefetch buffer until it starts to receive acknowledgements telling the broker that it is ok to dispatch more. With auto-acknowledgement this happens automatically which is probably what I was thinking of when I wrote my initial response. Which brings us back to the original message in that if the messages were truly unacknowledged, then yes the client should stop receiving messages until the messages are acknowledged or committed. When a transaction is used, the acknowledgement mode is ignored. You use one or the other. On Mon, Jun 22, 2015 at 9:52 AM, Tim Bain tb...@alumni.duke.edu wrote: I had always understood it as Kevin did: that the prefetch size (which is computed on the broker) was the number of outstanding, unacknowledged messages that the broker had dispatched to the consumer, and that once the number of unacknowledged messages equaled the prefetch limit, the broker would stop dispatching new messages until acks are used (see paragraph 3 of http://activemq.apache.org/what-is-the-prefetch-limit-for.html). Christopher, can you point to code or documentation that explains how the broker would dispatch more unacknowledged messages than the prefetch buffer size? Kevin, if the wiki (and your assumption) really is correct about how prefetch works, the crucial word here is unacknowledged: your messages will be limited by that count only if you're not acknowledging them till you're done processing the message. If you auto-ack the messages when you begin processing them, which is the default behavior, you'll keep getting new messages while the older ones are being processed. Your reference to commit() implies that you're using transactions and I don't know how that plays into the ack mode, but make sure that the messages are not getting acked till you're done processing them. This may play into the other thread you've got going, because if you queue the message up for work and then ack it before the work occurs, you've broken your ability to limit how many you're processing. Also, make sure (via JMX) that the prefetch value you think you're setting on your subscribers really is what you think you're setting; that could easily explain behavior that's not what you're expecting. Tim On Sun, Jun 21, 2015 at 6:40 PM, Christopher Shannon christopher.l.shan...@gmail.com wrote: Right, you will continue to receive messages if you process them. The prefetch size is used as an optimization and doesn't have anything to do with message acknowledgement. The broker will always try and keep your prefetch full on your client so you can quickly consume messages. As you process messages the broker will dispatch more to you regardless of whether or not you have called commit or acknowledged the messages. The broker will keep track of all messages until you actually acknowledge them. If you want to pause message acknowledgement and wait before consuming more messages then you should probably use a synchronous consumer instead and just call consumer.receive() and not try and use a messageListener which is asynchronous. On Sun, Jun 21, 2015 at 6:35 PM, Kevin Burton bur...@spinn3r.com wrote: I have a threaded app .. so let’s say I have 100 threads waiting to do work. I want an async message listener to read these messages, UP TO 100 messages, until I can process and commit() them. But I don’t think there’s a way to do this. I had ASSUMED that setting a prefetch of say 10, and a message listener, would only give me 10 messages, until I acknowledged them. But this doesn’t seem to be the case. I keep receiving messages on the listener. -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts
Re: Using MessageListener to read up to a limit of message?
I had always understood it as Kevin did: that the prefetch size (which is computed on the broker) was the number of outstanding, unacknowledged messages that the broker had dispatched to the consumer, and that once the number of unacknowledged messages equaled the prefetch limit, the broker would stop dispatching new messages until acks are used (see paragraph 3 of http://activemq.apache.org/what-is-the-prefetch-limit-for.html). Christopher, can you point to code or documentation that explains how the broker would dispatch more unacknowledged messages than the prefetch buffer size? Kevin, if the wiki (and your assumption) really is correct about how prefetch works, the crucial word here is unacknowledged: your messages will be limited by that count only if you're not acknowledging them till you're done processing the message. If you auto-ack the messages when you begin processing them, which is the default behavior, you'll keep getting new messages while the older ones are being processed. Your reference to commit() implies that you're using transactions and I don't know how that plays into the ack mode, but make sure that the messages are not getting acked till you're done processing them. This may play into the other thread you've got going, because if you queue the message up for work and then ack it before the work occurs, you've broken your ability to limit how many you're processing. Also, make sure (via JMX) that the prefetch value you think you're setting on your subscribers really is what you think you're setting; that could easily explain behavior that's not what you're expecting. Tim On Sun, Jun 21, 2015 at 6:40 PM, Christopher Shannon christopher.l.shan...@gmail.com wrote: Right, you will continue to receive messages if you process them. The prefetch size is used as an optimization and doesn't have anything to do with message acknowledgement. The broker will always try and keep your prefetch full on your client so you can quickly consume messages. As you process messages the broker will dispatch more to you regardless of whether or not you have called commit or acknowledged the messages. The broker will keep track of all messages until you actually acknowledge them. If you want to pause message acknowledgement and wait before consuming more messages then you should probably use a synchronous consumer instead and just call consumer.receive() and not try and use a messageListener which is asynchronous. On Sun, Jun 21, 2015 at 6:35 PM, Kevin Burton bur...@spinn3r.com wrote: I have a threaded app .. so let’s say I have 100 threads waiting to do work. I want an async message listener to read these messages, UP TO 100 messages, until I can process and commit() them. But I don’t think there’s a way to do this. I had ASSUMED that setting a prefetch of say 10, and a message listener, would only give me 10 messages, until I acknowledged them. But this doesn’t seem to be the case. I keep receiving messages on the listener. -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts
Re: Using MessageListener to read up to a limit of message?
Right, you will continue to receive messages if you process them. The prefetch size is used as an optimization and doesn't have anything to do with message acknowledgement. The broker will always try and keep your prefetch full on your client so you can quickly consume messages. As you process messages the broker will dispatch more to you regardless of whether or not you have called commit or acknowledged the messages. The broker will keep track of all messages until you actually acknowledge them. If you want to pause message acknowledgement and wait before consuming more messages then you should probably use a synchronous consumer instead and just call consumer.receive() and not try and use a messageListener which is asynchronous. On Sun, Jun 21, 2015 at 6:35 PM, Kevin Burton bur...@spinn3r.com wrote: I have a threaded app .. so let’s say I have 100 threads waiting to do work. I want an async message listener to read these messages, UP TO 100 messages, until I can process and commit() them. But I don’t think there’s a way to do this. I had ASSUMED that setting a prefetch of say 10, and a message listener, would only give me 10 messages, until I acknowledged them. But this doesn’t seem to be the case. I keep receiving messages on the listener. -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts