Hi,
This did the job:
mngr = new SubscriptionManager( session );
FlowControl fc(5, FlowControl::UNLIMITED, false);
mngr->subscribe(incoming, "teste_flow", SubscriptionSettings(fc));
At least the first part: Consume 5 items. But how do I give the process
more credit in order to read five more? What we want to do is have N consumers
and allow each to get a fare share of items. So, once each sibling consumer has
got their share, I would like to give them all a new share.
The closest thing to this I could get was:
// O esperado eh que apenas 5 elementos sejam consumidos.
mngr = new SubscriptionManager( session );
FlowControl fc(5, FlowControl::UNLIMITED, false);
while (1) {
mngr->subscribe(incoming, "teste_flow", SubscriptionSettings(fc));
while ( incoming.get(m, 1000000L) ) {
// printf("[%d] [%s]\n", pid, m.getData().c_str());
++count;
}
printf("[%d] = [%d]\n", pid, count);
mngr->cancel("teste_flow");
}
Running with 5 consumers, I get something like this:
[12272] = [200]
[12273] = [200]
[12274] = [195]
[12275] = [190]
[12276] = [190]
Is this the right way to do it? To keep creating and closing the
subscription?
Thanks,
Acácio.
-----Mensagem original-----
De: Gordon Sim [mailto:[email protected]]
Enviada em: terça-feira, 15 de dezembro de 2009 14:39
Para: [email protected]
Assunto: Re: Behaviour when there're elements on the queue before instantiating
the consumers
On 12/15/2009 04:08 PM, Acácio Centeno wrote:
> Hello Mr. Sim,
>
> Could you give me an example on using flow control? I tried this,
> expecting to limit the amount of messages to be consumed to five, but almost
> all the messages on the queue were consumed (I'm also fuzzy about why not
> all of them were consumed):
>
> #include<iostream>
>
> #include<qpid/client/Connection.h>
> #include<qpid/client/Session.h>
> #include<qpid/client/Message.h>
> #include<qpid/client/SubscriptionManager.h>
>
> using namespace std;
> using namespace qpid::client;
> using namespace qpid::framing;
>
> int main(int argc, char *argv[]) {
> Connection connection;
> ConnectionSettings settings;
> Session session;
> SubscriptionManager *mngr;
> LocalQueue incoming;
> Message m;
>
> pid_t pid = getpid();
>
> try {
> settings.host = "localhost";
> settings.port = 5672;
> settings.virtualhost = "bridge";
> settings.mechanism = "ANONYMOUS";
> settings.tcpNoDelay = false;
>
> connection.open(settings);
> session = connection.newSession();
>
> mngr = new SubscriptionManager( session );
One easy way is to change:
> mngr->subscribe(incoming, "teste_flow");
to:
mngr->subscribe(incoming, "teste_flow",
SubscriptionSettings(FlowControl::messageWindow(5)));
> // O esperado eh que apenas 5 elementos sejam consumidos.
You don't need the following line if you do the above; if you do want to
set the default instead of the specific approach above, this line needs
to happen *before* you call SubscriptionManager::subscribe().
> mngr->setFlowControl( "teste_flow", 5, 0, false );
>
> while ( incoming.get(m, 1000000L) ) {
> printf("[%d] [%s]\n", pid, m.getData().c_str());
> }
>
> printf("[%d] finalizado.\n", pid);
> } catch(const std::exception& error) {
> fprintf(stderr, "Erro: %s\n", error.what());
> return 1;
> }
>
> return 0;
> }
>
> Thanks,
Hope this helps!
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]