On 12/15/2009 08:10 PM, Luiz Cordeiro wrote:
Hi,

        This did the job:

       mngr = new SubscriptionManager( session );
       FlowControl fc(5, FlowControl::UNLIMITED, false);
       mngr->subscribe(incoming, "teste_flow", SubscriptionSettings(fc));

        At least the first part: Consume 5 items. But how do I give the process 
more credit in order to read five more? What we want to do is have N consumers 
and allow each to get a fare share of items. So, once each sibling consumer has 
got their share, I would like to give them all a new share.

There are two options. One option (which I would recommend) is to use a credit 'window' whereby more messages are sent as they are acknowledged by the client[1] (i.e. in this option the credit defines a 'prefetch' window). To do this you would set the last argument to your flow control object to be true. E.g.:

    FlowControl fc(5, FlowControl::UNLIMITED, true);

or

    FlowControl fc = FlowControl::messageWindow(5);//does the same thing

The other option is to directly issue credit, e.g. using Subscription::grantCredit().


[1] Even with this option you can control the acknowledgement process if you wish. In AMQP 0-10 there are two 'levels' of message acknowledgement (at the model- and session- levels). Flow is controlled by the session level completion (i.e. the session_completed control). By default these controls are sent by the library but the application can decide when that should happen or even take control for sending completion themselves (see SubscriptionSettings::completionMode for details).


        The closest thing to this I could get was:

       // O esperado eh que apenas 5 elementos sejam consumidos.
       mngr = new SubscriptionManager( session );
       FlowControl fc(5, FlowControl::UNLIMITED, false);

       while (1) {
          mngr->subscribe(incoming, "teste_flow", SubscriptionSettings(fc));

          while ( incoming.get(m, 1000000L) ) {
             // printf("[%d] [%s]\n", pid, m.getData().c_str());
             ++count;
          }

          printf("[%d] = [%d]\n", pid, count);
          mngr->cancel("teste_flow");
       }

        Running with 5 consumers, I get something like this:

[12272] = [200]
[12273] = [200]
[12274] = [195]
[12275] = [190]
[12276] = [190]

        Is this the right way to do it? To keep creating and closing the 
subscription?

Thanks,
Acácio.

-----Mensagem original-----
De: Gordon Sim [mailto:[email protected]]
Enviada em: terça-feira, 15 de dezembro de 2009 14:39
Para: [email protected]
Assunto: Re: Behaviour when there're elements on the queue before instantiating 
the consumers

On 12/15/2009 04:08 PM, Acácio Centeno wrote:
Hello Mr. Sim,

        Could you give me an example on using flow control? I tried this,
expecting to limit the amount of messages to be consumed to five, but almost
all the messages on the queue were consumed (I'm also fuzzy about why not
all of them were consumed):

#include<iostream>

#include<qpid/client/Connection.h>
#include<qpid/client/Session.h>
#include<qpid/client/Message.h>
#include<qpid/client/SubscriptionManager.h>

using namespace std;
using namespace qpid::client;
using namespace qpid::framing;

int main(int argc, char *argv[]) {
     Connection           connection;
     ConnectionSettings   settings;
     Session              session;
     SubscriptionManager  *mngr;
     LocalQueue           incoming;
     Message              m;

     pid_t                pid = getpid();

     try {
        settings.host = "localhost";
        settings.port = 5672;
        settings.virtualhost = "bridge";
        settings.mechanism = "ANONYMOUS";
        settings.tcpNoDelay = false;

        connection.open(settings);
        session = connection.newSession();

        mngr = new SubscriptionManager( session );

One easy way is to change:

        mngr->subscribe(incoming, "teste_flow");

to:
           mngr->subscribe(incoming, "teste_flow",
SubscriptionSettings(FlowControl::messageWindow(5)));

        // O esperado eh que apenas 5 elementos sejam consumidos.

You don't need the following line if you do the above; if you do want to
set the default instead of the specific approach above, this line needs
to happen *before* you call SubscriptionManager::subscribe().

        mngr->setFlowControl( "teste_flow", 5, 0, false );

        while ( incoming.get(m, 1000000L) ) {
           printf("[%d] [%s]\n", pid, m.getData().c_str());
        }

        printf("[%d] finalizado.\n", pid);
     } catch(const std::exception&   error) {
        fprintf(stderr, "Erro: %s\n", error.what());
        return 1;
     }

     return 0;
}

Thanks,

Hope this helps!

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to