On 12/12/2009 03:49 PM, Jason Schlauch wrote:
I have a fairly simple program that, so far, is based pretty much on
the sample c++ pub/sub code.  I'm having an issue where QPID reports
"No listener found for destination" about half the time.

I believe this is because you are running multiple dispatch threads on the same session as a result of using multiple SubscriptionManagers per session.

Each SubscriptionManager only knows about one of the subscriptions but will may messages from any of the active subscriptions. When it gets a message from one of the subscriptions registered with another SubscriptionManager instance it doesn't know what to do with it and prints the error you are seeing.

Simple fix is to use a single SubscriptionManager per session. (Alternatively if you really need to process the messages on the same session in parallel threads, use a LocalQueue per subscription with each SubscriptionManager and then start your own threads that process these).

I have a subscription manager that's a factory style object.  It holds
the connection and a session object created from that connection.  It
creates Listener objects that are derived from the following base
class:

class QPListenerBase : public MessageListener {
   private:
     void received(Message&  message);
   protected:
     Session&  m_session;
     SubscriptionManager m_subscriptions;
     std::string m_GUID;
   public:
     QPListenerBase(Session&  session);
     virtual void prepareQueue(const std::string&  routing_key);
     virtual void listen();
     virtual void unlisten();
     virtual void waitforstop();
     virtual void receiveMessage(Message&  message);
     virtual ~QPListenerBase() { };
};

I have specialized listeners that are subclasses of QPListenerBase.
My base received() simply calls the specialized receiveMessage() in
the subclasses.

The factory, when it creates a listener, initializes it using
prepareQueue() almost exactly as per the sample code:

void QPListenerBase::prepareQueue(const std::string&  routing_key) {
    std::string queue = "listener." + routing_key + "." + m_GUID;
    m_session.queueDeclare(arg::queue=queue, arg::exclusive=true,
arg::autoDelete=true);
    m_session.exchangeBind(arg::exchange = "amq.topic",
arg::queue=queue, arg::bindingKey=routing_key);
    m_subscriptions.subscribe(*this, queue);
}

... and stores it in a vector of listeners as follows:
   boost::shared_ptr<QPListenerBase>  ptr(new QPListenerBase(m_session));
   ptr->prepareQueue("channel.msgs." + senderGUID);
   m_listeners.push_back(ptr);

(I'm using shared_ptr here because QPListenerBase, with its Session
reference, doesn't meet the stl criteria of being copyable, so I wrap
it in a shared_ptr, which is copyable).

When it's runtime I simply iterate over the listener collection and
ultimately call QPListenerBase.m_subscriptions.start()

I have a python test script that sends messages that should be picked
up by specific listeners.  And they are, about half the time.

Here's what I get when I have two listeners listening on a topic
exchange.  Messages are sent randomly to either channel.msgs.0 or
channel.msgs.1 here.  The message content is a UUID.  I first print
out the message in the base received() and then in my subclass
receiveMessage().

[received ef923fdc-83dd-4c54-875a-97e8be218860]:
a011bede-92cb-47a9-95d0-4125303b8cfd
    [receiveMessage ef923fdc-83dd-4c54-875a-97e8be218860]:
a011bede-92cb-47a9-95d0-4125303b8cfd
2009-12-12 10:29:59 error No listener found for destination
listener.channel.msgs.1.ef923fdc-83dd-4c54-875a-97e8be218860
2009-12-12 10:30:04 error No listener found for destination
listener.channel.msgs.0.f7a20b18-9b30-4ada-abb5-a8bf492b5353
2009-12-12 10:30:09 error No listener found for destination
listener.channel.msgs.1.ef923fdc-83dd-4c54-875a-97e8be218860
[received ef923fdc-83dd-4c54-875a-97e8be218860]:
7e1426b1-fba7-4e70-969a-c0b726293ebb
    [receiveMessage ef923fdc-83dd-4c54-875a-97e8be218860]:
7e1426b1-fba7-4e70-969a-c0b726293ebb
[received f7a20b18-9b30-4ada-abb5-a8bf492b5353]:
93ab5106-c774-419e-bf1d-4df99c5f97a1
    [receiveMessage f7a20b18-9b30-4ada-abb5-a8bf492b5353]:
93ab5106-c774-419e-bf1d-4df99c5f97a1
[received ef923fdc-83dd-4c54-875a-97e8be218860]:
aeaa41a1-b709-4fde-8ca2-8cf650ec4baf
    [receiveMessage ef923fdc-83dd-4c54-875a-97e8be218860]:
aeaa41a1-b709-4fde-8ca2-8cf650ec4baf
2009-12-12 10:30:29 error No listener found for destination
listener.channel.msgs.1.ef923fdc-83dd-4c54-875a-97e8be218860
2009-12-12 10:30:34 error No listener found for destination
listener.channel.msgs.0.f7a20b18-9b30-4ada-abb5-a8bf492b5353

Why are my listeners listening only half the time?  Any ideas?

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to