I have been working on replacing a QPID messaging application that uses the current client C++ API, with a new version based on the C++ messaging API. But I have hit a few problems that I hope someone can provide assistance with. Attached is a simple client application that I have been using to demonstrate the problems I have been having (using CentOS 6.3 64bit).

My particular use case is for a named queue with several bindings associated with it. An easy use case that the current client API can handle, but something the messaging API seems to struggle with.

The attached test program uses the messaging API to create two receivers using the following address strings:

qpidRxer; {create:always, delete:always, node:{type:queue, x-bindings:[{exchange:amq.topic, key:rxer1}]}} qpidRxer; {create:always, delete:always, node:{type:queue, x-bindings:[{exchange:amq.topic, key:rxer2}]}}

Using qpid-stat I can see the qpidRxer queue has been created and the rxer1/rxer2 bindings applied. So all good so far.

So I send a test message with the spout application (from cpp/examples/messaging) using the following options:

       ./spout --content 'rxer2 test message' 'amq.topic/rxer2'

This is where things start going wrong as the message is received by the receiver instance that was used to create the rxer1 binding. If I send the message again, then it is received by the Receiver associated with the rxer2 binding. If I send the message several times, then the Receivers just cycle round each taking it in turns to receive the test message. So it would appear that the message filtering used by the Receiver implementation does not go down to the binding key level. Which seems inconsistent with the address strings I have used. I can program around this, but its not ideal.

The next problem occurs if I change the code slightly and basically close the second Receiver instance, after it has been created, just to simulate the need to sometimes remove a binding. Using qpid-stat I can see that the queue has been deleted!!, even though I still have a valid Receiver instance to this queue in my application. Yes, I know that I have the 'delete:always' flag set in the address string, but I would have thought that some kind of reference counting would have been undertaken by the broker (particularly as I have two subscriptions in the broker when both the Receivers were created.)

So I remove the 'delete:always' flag from both address strings and try again (closing the second receiver after creation). Using qpid-stat I can see that the qpidRxer queue is still there (which is good news), but unfortunately both rxer1 and rxer2 bindings are still present on the queue, so still not quite right. But now I have got in to this state I cannot see any way, via the messaging API, that can help me remove the unused binding; I have to either use the client/console API via another connection.

I have looked through the address string documentation but couldn't seem to find any flag/tag that would provide the desired functionality. So for the time being I seem to be stuck with sticking with the client API.

On a related usage topic, Session's going invalid due to an error also causes problems. If I create several Receivers/Senders using the same Session and then try to create a Receiver for a queue that doesn't exist in the broker, then the session reports an error. Which in its self it not a problem, but there doesn't seem to be a distinction between fatal errors and recoverable errors. Once a session errors, then that's it. Any existing Receivers or Senders, created with that session, need to be removed and recreated with a new session. It seems a bit excessive that attempting to create a Receiver with a non existent queue would cause such a headache. The solution of creating a session for each Receiver/Sender then seems a bit heavy weight as well.

Would anyone be able to provide any advice on how they have gotten around these issues, or provide an Address String Hack that might solve the problem? Any help gratefully received.

Also, are there any updates to the messaging API in the next QPID release?

Clive

#include <string>
#include <iostream>
#include <stdlib.h>

#include "qpid/Exception.h"

#include "qpid/messaging/Session.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Receiver.h"
#include "qpid/messaging/Connection.h"

int main(int argc, char *argv[])
{
   std::string address1("qpidRxer; {create:always, delete:always, 
node:{type:queue, x-bindings:[{exchange:amq.topic, key:rxer1}]}}");
   std::string address2("qpidRxer; {create:always, delete:always, 
node:{type:queue, x-bindings:[{exchange:amq.topic, key:rxer2}]}}");
   
   if (argc > 1){
       address1 = argv[1];
   }
   
   if (argc > 2){
       address2 = argv[2];
   }
   
   try {
      qpid::messaging::Connection connection("amqp:tcp:127.0.0.1:5672");
      connection.open();
      
      qpid::messaging::Session session = connection.createSession();
      qpid::messaging::Receiver rxer1  = session.createReceiver(address1);
      qpid::messaging::Receiver rxer2  = session.createReceiver(address2);

      rxer1.setCapacity(1);
      rxer2.setCapacity(1);

      qpid::messaging::Message  message;
      qpid::messaging::Receiver currentRxer;
      
      //rxer1.close;  // test removal of binding from queue     

      if (!session.hasError()){
           while(true){
                  std::cout << "waiting for receiver ... " << std::endl;

               if(session.nextReceiver(currentRxer)){
                  std::cout << "receiver ready ... " << std::endl;
                  currentRxer.fetch(message);
                  std::cout << " received msg [" << currentRxer.getName() << "] 
: " << message.getSubject() << std::endl;
                  session.acknowledge();
               }
           }
      }
   }
   catch(const qpid::types::Exception& qe){
     std::cout << "qpidRxer caught unexpected qpid::types::Exception : " << 
qe.what() << std::endl; 
     return 1;  
   }
   catch(const qpid::Exception& qe){
       std::cout << "qpidRxer caught unexpected qpid::messages::Exception : " 
<< qe.what() << std::endl;
       return 1;
   }
   return EXIT_SUCCESS;
}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to