On 08/12/2010 07:05 PM, Andrew L wrote:

I am trying to use a single SubscriptionManager to create and cancel multiple 
subscriptions.  After canceling a subscription, future subscriptions created by 
the SubscriptionManager would never be delivered messages.  I modified the 
example program to test it and got the same results.

The problem is that once stopped, a SubscriptionManager cannot be restarted and in fact the normal message dispatch on the session is itself not restartable.

With 'auto-stop' on, when you cancel your last subscription, dispatch stops.

You could therefore turn auto-stop off (which would require you to make an explicit stop() call when ready to shutdown). That way cancelling then resubscribing would still work.

Another option would be to use a LocalQueue for the subscriptions (or even just the first one), and actively pull messages off as needed.

The best option will likely also depend on the details of your application.

Also, I would advise against using SubscriptionManager::start(). All it does is spin off a new thread and call run(), but it doesn't handle exceptions during dispatch. I would recommend using run() and if needed starting your own dispatch thread for that.

If you are just starting out, it would perhaps be worth looking at the new API that has been developed: http://qpid.apache.org/books/0.7/Programming-In-Apache-Qpid/html/

This is I think much simpler to use. It is not yet available in released form (the 0.6 release is only an early preview), but is now stable on trunk.


Should this consumer not receive any messages on message_queue2?

If I comment out the sub.cancel() and the subsequent manager.run() the 
application receives messages on both subscriptions successfully.

#include<qpid/client/Connection.h>
#include<qpid/client/Session.h>
#include<qpid/client/Message.h>
#include<qpid/client/MessageListener.h>
#include<qpid/client/SubscriptionManager.h>
#include<cstdlib>
#include<iostream>
using namespace qpid::client;
using namespace qpid::framing;

class Listener : public MessageListener{
   private:
     SubscriptionManager&  subscriptions;
   public:
     Listener(SubscriptionManager&  subscriptions);
     virtual void received(Message&  message);
};
Listener::Listener(SubscriptionManager&  subs) : subscriptions(subs)
{}
void Listener::received(Message&  message) {
   std::cout<<  "Message: "<<  message.getData()<<  std::endl;
}
int main(int argc, char** argv) {
     const char* host = argc>1 ? argv[1] : "127.0.0.1";
     int port = argc>2 ? atoi(argv[2]) : 5672;
     Connection connection;
     try {
       connection.open(host, port);
       Session session =  connection.newSession();
   //--------- Main body of program --------------------------------------------
       SubscriptionManager subscriptions(session);
       // Create a listener and subscribe it to the queue named "message_queue"
       Listener listener(subscriptions), listener2(subscriptions);
       Subscription sub = subscriptions.subscribe(listener,
"message_queue");
       // Receive messages until the subscription is cancelled
       // by Listener::received()
       subscriptions.start();
   //---------------------------------------------------------------------------
         char test;
         std::cout<<  "input"<<  std::endl;
         std::cin>>  test;
       sub.cancel();
         std::cout<<  " again "<<  std::endl;
         std::cin>>  test;
         sub = subscriptions.subscribe(listener2, "message_queue2");
       subscriptions.run();
         std::cout<<  "after start"<<  std::endl;
         std::cout<<  " 222"<<  std::endl;
         std::cin>>  test;
       connection.close();
       return 0;
     } catch(const std::exception&  error) {
         std::cout<<  error.what()<<  std::endl;
     }
     return 1;
}
                                        


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to