On 07/12/2012 01:15 PM, Zhihua Che wrote:
According what you said, I think the capacity of the receiver is used
to speed up fetching action by prefetching the message.
It should not limit the number the message the receiver could fetch. Right?

Right.

I've attached a simple example of what you described. When I run these, I see the receiver getting more than 100 messages, provided I start the sender first and allow the queue to build up a little before starting the receiver. Do you see the same?

Note that because you are using Duration::IMMEDIATE for the receiver, it will return false if at any attempt to fetch there are no more messages. Since the sender is publishing synchronously, message come in to the queue much more slowly that they go out, so eventually it is likely that the receiver doesn't find a message.

One solution there is to use a timeout of say 1 second, that would mean the fetch() call waist for up to a second for messages if there are none on the queue at the time. Another option is to simply wait and retry if fetch() returns false.
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <iostream>
#include <sstream>

using namespace qpid::messaging;

int main(int argc, char** argv)
{
    try
    {
        Connection connection("localhost");
        connection.open();
        Session session = connection.createSession();
        Sender sender = session.createSender("my-queue; {create: always}");
        sender.setCapacity(100);
        int count(0);
        while (true) {
            std::stringstream content;
            content << "Message " << ++count;
            sender.send(Message(content.str()), true);
        }
        session.close();
        connection.close();
        return 0;
    } catch(std::exception & e ) {
        std::cout << "exception: " << e.what() << std::endl;
        return 1;
    }
}
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Receiver.h>
#include <iostream>

using namespace qpid::messaging;

int main(int argc, char** argv)
{
    try
    {
        Connection connection("localhost");
        connection.open();
        Session session = connection.createSession();
        Receiver receiver = session.createReceiver("my-queue; {create: always}");
        receiver.setCapacity(100);
        Message m;
        while (receiver.fetch(m, Duration::IMMEDIATE)) {
            std::cout << "Received: " << m.getContent() << std::endl;
        }
        std::cout << "No more messages!" << std::endl;
        session.close();
        connection.close();
        return 0;
    } catch(std::exception & e ) {
        std::cout << "exception: " << e.what() << std::endl;
        return 1;
    }
}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to