On 07/12/2012 01:41 PM, Zhihua Che wrote:
2012/7/12 Gordon Sim <[email protected]>:
On 07/12/2012 01:15 PM, Zhihua Che wrote:
According what you said, I think the capacity of the receiver is used
to speed up fetching action by prefetching the message.
It should not limit the number the message the receiver could fetch.
Right?
Right.
I've attached a simple example of what you described. When I run these, I
see the receiver getting more than 100 messages, provided I start the sender
first and allow the queue to build up a little before starting the receiver.
Do you see the same?
Note that because you are using Duration::IMMEDIATE for the receiver, it
will return false if at any attempt to fetch there are no more messages.
Since the sender is publishing synchronously, message come in to the queue
much more slowly that they go out, so eventually it is likely that the
receiver doesn't find a message.
One solution there is to use a timeout of say 1 second, that would mean the
fetch() call waist for up to a second for messages if there are none on the
queue at the time. Another option is to simply wait and retry if fetch()
returns false.
Yes, I didn't mention that but I did add a quick sleep after everytime
fetching the message failed.
I read your code, it's the same as mine.
I ran your code and yes, the receiver fetched more than 100 messages.
It's wired...
I also watched your 'my-queue' by qpid-tool and found that your
'release' reaches the same number as 'acquires'
and your 'msgTotalDequeues' is 0.
The stats are not like mine queue.
Sorry, thats just because I forgot to acknowledge() the messages.
Correction attached. It still works as expected, but now the messages
get dequeued.
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Receiver.h>
#include <iostream>
using namespace qpid::messaging;
int main(int argc, char** argv)
{
try
{
Connection connection("localhost");
connection.open();
Session session = connection.createSession();
Receiver receiver = session.createReceiver("my-queue; {create: always}");
receiver.setCapacity(100);
Message m;
while (receiver.fetch(m, Duration::IMMEDIATE)) {
std::cout << "Received: " << m.getContent() << std::endl;
session.acknowledge();
}
std::cout << "No more messages!" << std::endl;
session.close();
connection.close();
return 0;
} catch(std::exception & e ) {
std::cout << "exception: " << e.what() << std::endl;
return 1;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]