I have been working on replacing a QPID messaging application that uses
the current client C++ API, with a new version based on the C++
messaging API. But I have hit a few problems that I hope someone can
provide assistance with. Attached is a simple client application that I
have been using to demonstrate the problems I have been having (using
CentOS 6.3 64bit).
My particular use case is for a named queue with several bindings
associated with it. An easy use case that the current client API can
handle, but something the messaging API seems to struggle with.
The attached test program uses the messaging API to create two receivers
using the following address strings:
qpidRxer; {create:always, delete:always, node:{type:queue,
x-bindings:[{exchange:amq.topic, key:rxer1}]}}
qpidRxer; {create:always, delete:always, node:{type:queue,
x-bindings:[{exchange:amq.topic, key:rxer2}]}}
Using qpid-stat I can see the qpidRxer queue has been created and the
rxer1/rxer2 bindings applied. So all good so far.
So I send a test message with the spout application (from
cpp/examples/messaging) using the following options:
./spout --content 'rxer2 test message' 'amq.topic/rxer2'
This is where things start going wrong as the message is received by the
receiver instance that was used to create the rxer1 binding. If I send
the message again, then it is received by the Receiver associated with
the rxer2 binding. If I send the message several times, then the
Receivers just cycle round each taking it in turns to receive the test
message. So it would appear that the message filtering used by the
Receiver implementation does not go down to the binding key level. Which
seems inconsistent with the address strings I have used. I can program
around this, but its not ideal.
The next problem occurs if I change the code slightly and basically
close the second Receiver instance, after it has been created, just to
simulate the need to sometimes remove a binding. Using qpid-stat I can
see that the queue has been deleted!!, even though I still have a valid
Receiver instance to this queue in my application. Yes, I know that I
have the 'delete:always' flag set in the address string, but I would
have thought that some kind of reference counting would have been
undertaken by the broker (particularly as I have two subscriptions in
the broker when both the Receivers were created.)
So I remove the 'delete:always' flag from both address strings and try
again (closing the second receiver after creation). Using qpid-stat I
can see that the qpidRxer queue is still there (which is good news), but
unfortunately both rxer1 and rxer2 bindings are still present on the
queue, so still not quite right. But now I have got in to this state I
cannot see any way, via the messaging API, that can help me remove the
unused binding; I have to either use the client/console API via another
connection.
I have looked through the address string documentation but couldn't seem
to find any flag/tag that would provide the desired functionality. So
for the time being I seem to be stuck with sticking with the client API.
On a related usage topic, Session's going invalid due to an error also
causes problems. If I create several Receivers/Senders using the same
Session and then try to create a Receiver for a queue that doesn't exist
in the broker, then the session reports an error. Which in its self it
not a problem, but there doesn't seem to be a distinction between fatal
errors and recoverable errors. Once a session errors, then that's it.
Any existing Receivers or Senders, created with that session, need to be
removed and recreated with a new session. It seems a bit excessive that
attempting to create a Receiver with a non existent queue would cause
such a headache. The solution of creating a session for each
Receiver/Sender then seems a bit heavy weight as well.
Would anyone be able to provide any advice on how they have gotten
around these issues, or provide an Address String Hack that might solve
the problem? Any help gratefully received.
Also, are there any updates to the messaging API in the next QPID release?
Clive
#include <string>
#include <iostream>
#include <stdlib.h>
#include "qpid/Exception.h"
#include "qpid/messaging/Session.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Receiver.h"
#include "qpid/messaging/Connection.h"
int main(int argc, char *argv[])
{
std::string address1("qpidRxer; {create:always, delete:always,
node:{type:queue, x-bindings:[{exchange:amq.topic, key:rxer1}]}}");
std::string address2("qpidRxer; {create:always, delete:always,
node:{type:queue, x-bindings:[{exchange:amq.topic, key:rxer2}]}}");
if (argc > 1){
address1 = argv[1];
}
if (argc > 2){
address2 = argv[2];
}
try {
qpid::messaging::Connection connection("amqp:tcp:127.0.0.1:5672");
connection.open();
qpid::messaging::Session session = connection.createSession();
qpid::messaging::Receiver rxer1 = session.createReceiver(address1);
qpid::messaging::Receiver rxer2 = session.createReceiver(address2);
rxer1.setCapacity(1);
rxer2.setCapacity(1);
qpid::messaging::Message message;
qpid::messaging::Receiver currentRxer;
//rxer1.close; // test removal of binding from queue
if (!session.hasError()){
while(true){
std::cout << "waiting for receiver ... " << std::endl;
if(session.nextReceiver(currentRxer)){
std::cout << "receiver ready ... " << std::endl;
currentRxer.fetch(message);
std::cout << " received msg [" << currentRxer.getName() << "]
: " << message.getSubject() << std::endl;
session.acknowledge();
}
}
}
}
catch(const qpid::types::Exception& qe){
std::cout << "qpidRxer caught unexpected qpid::types::Exception : " <<
qe.what() << std::endl;
return 1;
}
catch(const qpid::Exception& qe){
std::cout << "qpidRxer caught unexpected qpid::messages::Exception : "
<< qe.what() << std::endl;
return 1;
}
return EXIT_SUCCESS;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]