On 12/15/2009 08:10 PM, Luiz Cordeiro wrote:
Hi,
This did the job:
mngr = new SubscriptionManager( session );
FlowControl fc(5, FlowControl::UNLIMITED, false);
mngr->subscribe(incoming, "teste_flow", SubscriptionSettings(fc));
At least the first part: Consume 5 items. But how do I give the process
more credit in order to read five more? What we want to do is have N consumers
and allow each to get a fare share of items. So, once each sibling consumer has
got their share, I would like to give them all a new share.
There are two options. One option (which I would recommend) is to use a
credit 'window' whereby more messages are sent as they are acknowledged
by the client[1] (i.e. in this option the credit defines a 'prefetch'
window). To do this you would set the last argument to your flow control
object to be true. E.g.:
FlowControl fc(5, FlowControl::UNLIMITED, true);
or
FlowControl fc = FlowControl::messageWindow(5);//does the same thing
The other option is to directly issue credit, e.g. using
Subscription::grantCredit().
[1] Even with this option you can control the acknowledgement process if
you wish. In AMQP 0-10 there are two 'levels' of message acknowledgement
(at the model- and session- levels). Flow is controlled by the session
level completion (i.e. the session_completed control). By default these
controls are sent by the library but the application can decide when
that should happen or even take control for sending completion
themselves (see SubscriptionSettings::completionMode for details).
The closest thing to this I could get was:
// O esperado eh que apenas 5 elementos sejam consumidos.
mngr = new SubscriptionManager( session );
FlowControl fc(5, FlowControl::UNLIMITED, false);
while (1) {
mngr->subscribe(incoming, "teste_flow", SubscriptionSettings(fc));
while ( incoming.get(m, 1000000L) ) {
// printf("[%d] [%s]\n", pid, m.getData().c_str());
++count;
}
printf("[%d] = [%d]\n", pid, count);
mngr->cancel("teste_flow");
}
Running with 5 consumers, I get something like this:
[12272] = [200]
[12273] = [200]
[12274] = [195]
[12275] = [190]
[12276] = [190]
Is this the right way to do it? To keep creating and closing the
subscription?
Thanks,
Acácio.
-----Mensagem original-----
De: Gordon Sim [mailto:[email protected]]
Enviada em: terça-feira, 15 de dezembro de 2009 14:39
Para: [email protected]
Assunto: Re: Behaviour when there're elements on the queue before instantiating
the consumers
On 12/15/2009 04:08 PM, Acácio Centeno wrote:
Hello Mr. Sim,
Could you give me an example on using flow control? I tried this,
expecting to limit the amount of messages to be consumed to five, but almost
all the messages on the queue were consumed (I'm also fuzzy about why not
all of them were consumed):
#include<iostream>
#include<qpid/client/Connection.h>
#include<qpid/client/Session.h>
#include<qpid/client/Message.h>
#include<qpid/client/SubscriptionManager.h>
using namespace std;
using namespace qpid::client;
using namespace qpid::framing;
int main(int argc, char *argv[]) {
Connection connection;
ConnectionSettings settings;
Session session;
SubscriptionManager *mngr;
LocalQueue incoming;
Message m;
pid_t pid = getpid();
try {
settings.host = "localhost";
settings.port = 5672;
settings.virtualhost = "bridge";
settings.mechanism = "ANONYMOUS";
settings.tcpNoDelay = false;
connection.open(settings);
session = connection.newSession();
mngr = new SubscriptionManager( session );
One easy way is to change:
mngr->subscribe(incoming, "teste_flow");
to:
mngr->subscribe(incoming, "teste_flow",
SubscriptionSettings(FlowControl::messageWindow(5)));
// O esperado eh que apenas 5 elementos sejam consumidos.
You don't need the following line if you do the above; if you do want to
set the default instead of the specific approach above, this line needs
to happen *before* you call SubscriptionManager::subscribe().
mngr->setFlowControl( "teste_flow", 5, 0, false );
while ( incoming.get(m, 1000000L) ) {
printf("[%d] [%s]\n", pid, m.getData().c_str());
}
printf("[%d] finalizado.\n", pid);
} catch(const std::exception& error) {
fprintf(stderr, "Erro: %s\n", error.what());
return 1;
}
return 0;
}
Thanks,
Hope this helps!
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]